1 /** 2 DynamoDB client routines 3 */ 4 5 module vibe.aws.dynamodb; 6 7 import std.algorithm; 8 import std.array; 9 import std.base64; 10 import std.conv; 11 import std.container; 12 import std.stdio; 13 import std.string; 14 import std.traits; 15 import std.variant; 16 17 import vibe.data.json; 18 19 import vibe.aws.aws; 20 import vibe.aws.morejson; 21 22 public alias StringSet = RedBlackTree!string; 23 public alias stringSet = redBlackTree!(false, string); 24 25 /// 26 struct CreateTableParams 27 { 28 /// 29 struct AttributeDefinition 30 { 31 string AttributeName; 32 string AttributeType; 33 } 34 35 /// 36 struct KeyDefinition 37 { 38 string AttributeName; 39 string KeyType; 40 } 41 42 /// 43 struct ProvisionDefinition 44 { 45 long ReadCapacityUnits; 46 long WriteCapacityUnits; 47 } 48 49 string TableName; 50 AttributeDefinition[] AttributeDefinitions; 51 KeyDefinition[] KeySchema; 52 ProvisionDefinition ProvisionedThroughput; 53 } 54 55 /** 56 Throttled by Control Plane API 57 */ 58 class ThrottlingException : AWSException 59 { 60 this(string type, string message) 61 { 62 super(type, true, message); 63 } 64 } 65 66 /** 67 Throttled by Data Plane API 68 */ 69 class ProvisionedThroughputExceededException : AWSException 70 { 71 this(string type, string message) 72 { 73 super(type, true, message); 74 } 75 } 76 77 class ItemNotFoundException : AWSException 78 { 79 this(string message) 80 { 81 super("ItemNotFoundException", false, message); 82 } 83 } 84 85 /** 86 Base DynamoDB client instance 87 88 Construct an instance of this class for the appropriate region, pass in a 89 credentials source, and grab a Table instance to do actual work. 90 */ 91 class DynamoDB : AWSClient 92 { 93 private static immutable string apiVersion = "DynamoDB_20120810"; 94 95 this(string region, AWSCredentialSource credsSource) 96 { 97 super("https://dynamodb." ~ region ~ ".amazonaws.com", region, "dynamodb", credsSource); 98 } 99 100 this(string endpoint = "http://localhost:8000") 101 { 102 super(endpoint, region, "dynamodb", new StaticAWSCredentials("", "")); 103 } 104 105 override AWSException makeException(string type, bool retriable, string message) 106 { 107 // Recognize some specific DynamoDB exceptions that are actually 108 // retriable (unlike their error code would suggest) 109 if (type == exceptionPrefix ~ "ThrottlingException") 110 return new ThrottlingException(type, message); 111 if (type == exceptionPrefix ~ "ProvisionedThroughputExceededException") 112 return new ProvisionedThroughputExceededException(type, message); 113 return super.makeException(type, retriable, message); 114 } 115 116 private auto ddbRequest(string command, Json requestBody) 117 { 118 return doRequest(apiVersion ~ "." ~ command, requestBody); 119 } 120 121 @property Table table(string name) 122 { 123 return new Table(name, this); 124 } 125 126 @property auto tables() 127 { 128 auto resp = ddbRequest("ListTables", Json.emptyObject); 129 return resp.responseBody["TableNames"].arrayIterator().map!(t => table(t.get!string)); 130 // FIXME: continuation token for > 100 tables, but who actually needs that... 131 } 132 133 @property auto tableNames() 134 { 135 auto resp = ddbRequest("ListTables", Json.emptyObject); 136 return resp.responseBody["TableNames"].arrayIterator().map!(t => t.get!string); 137 // FIXME: continuation token for > 100 tables, but who actually needs that... 138 } 139 140 auto createTable(CreateTableParams params) 141 { 142 return ddbRequest("CreateTable", serializeToJson(params)); 143 } 144 } 145 146 /** 147 A DynamoDB table 148 */ 149 class Table 150 { 151 immutable string name; 152 153 private DynamoDB m_client; 154 155 this(string name, DynamoDB client) 156 { 157 this.name = name; 158 m_client = client; 159 } 160 161 void put(in Item item) 162 { 163 auto resp = m_client.ddbRequest("PutItem", Json(["TableName" 164 : Json(this.name), "Item" : item.ddbJson])); 165 } 166 167 void put(in Item item, string conditionExpression, Item attributeValues) 168 { 169 auto resp = m_client.ddbRequest("PutItem", Json(["TableName" : Json(this.name), "Item" : item.ddbJson, 170 "ConditionExpression" : Json(conditionExpression), 171 "ExpressionAttributeValues" : attributeValues.ddbJson])); 172 } 173 174 Item get(T)(string hashKey, T hashValue) 175 { 176 return get(Json([hashKey : variantToDDB(toVariant(hashValue))])); 177 } 178 179 Item get(T, U)(string hashKey, T hashValue, string rangeKey, U rangeValue) 180 { 181 return get(Json([hashKey : variantToDDB(toVariant(hashValue)), rangeKey 182 : variantToDDB(toVariant(rangeValue))])); 183 } 184 185 private Item get(Json key) 186 { 187 auto resp = m_client.ddbRequest("GetItem", Json(["TableName" 188 : Json(this.name), "Key" : key])); 189 190 Item ret; 191 auto itemKey = "Item" in resp.responseBody; 192 if (!itemKey) 193 throw new ItemNotFoundException("No item with key " ~ key.toString()); 194 195 auto obj = itemKey.get!(Json[string]); 196 foreach (k; obj.byKey) 197 { 198 ret.attrs[k] = DDBtoVariant(obj[k]); 199 } 200 201 return ret; 202 } 203 204 void del(T)(string hashKey, T hashValue) 205 { 206 return del(Json([hashKey : variantToDDB(toVariant(hashValue))])); 207 } 208 209 void del(T, U)(string hashKey, T hashValue, string rangeKey, U rangeValue) 210 { 211 return del(Json([hashKey : variantToDDB(toVariant(hashValue)), rangeKey 212 : variantToDDB(toVariant(rangeValue))])); 213 } 214 215 private void del(Json key) 216 { 217 auto resp = m_client.ddbRequest("DeleteItem", Json(["TableName" 218 : Json(this.name), "Key" : key])); 219 } 220 } 221 222 /** 223 Convert a D variant value to a type-tagged DDB value 224 */ 225 private Json variantToDDB(Variant v) 226 { 227 auto ret = Json.emptyObject; 228 if (!v.hasValue) 229 ret["NULL"] = true; 230 else if (v.type == typeid(string)) 231 ret["S"] = Json(v.get!string); 232 else if (v.type == typeid(ubyte[])) 233 ret["B"] = Json(Base64.encode(v.get!(ubyte[]))); 234 else if (v.type == typeid(bool)) 235 ret["BOOL"] = Json(v.coerce!string); 236 else if (v.convertsTo!double) 237 ret["N"] = Json(v.coerce!string); 238 else if (v.convertsTo!ulong) 239 ret["N"] = Json(v.coerce!string); 240 else if (v.convertsTo!long) 241 ret["N"] = Json(v.coerce!string); 242 else if (v.type == typeid(StringSet)) 243 { 244 ret["SS"] = Json(v.get!StringSet.array().map!(s => Json(s)).array()); 245 } 246 else if (v.type == typeid(Variant[string])) 247 { 248 auto o = Json.emptyObject; 249 auto value = v.get!(Variant[string]); 250 foreach (name; value.byKey()) 251 o[name] = variantToDDB(value[name]); 252 ret["M"] = o; 253 } 254 255 else 256 throw new Exception("Unsupported DynamoDB value type: " ~ v.type.to!string); 257 258 // FIXME: support more types 259 260 return ret; 261 } 262 263 private Variant DDBtoVariant(Json obj) 264 { 265 if ("NULL" in obj) 266 return Variant(); 267 auto pv = "S" in obj; 268 if (pv) 269 return Variant(pv.get!string); 270 pv = "B" in obj; 271 if (pv) 272 return Variant(Base64.decode(pv.get!string)); 273 pv = "BOOL" in obj; 274 if (pv) 275 return Variant(pv.get!string == "true"); 276 pv = "N" in obj; 277 if (pv) 278 { 279 // Number, but which type? 280 if (pv.get!string.indexOf(".") != -1) 281 return Variant(pv.get!string.to!double); 282 try 283 { 284 return Variant(pv.get!string.to!int); 285 } 286 catch (ConvOverflowException ex) 287 { 288 return Variant(pv.get!string.to!long); 289 } 290 } 291 pv = "SS" in obj; 292 if (pv) 293 { 294 StringSet ss = stringSet(); 295 foreach (x; arrayIterator(*pv)) 296 { 297 ss.stableInsert(x.get!string); 298 } 299 return Variant(ss); 300 } 301 pv = "M" in obj; 302 if (pv) 303 { 304 // Object 305 Json[string] src = pv.get!(Json[string]); 306 Variant[string] dest; 307 foreach (k; src.byKey) 308 { 309 dest[k] = DDBtoVariant(src[k]); 310 } 311 return Variant(dest); 312 } 313 314 throw new Exception("Unsupported DynamoDB value: " ~ obj.toString()); 315 } 316 317 unittest 318 { 319 assert(variantToDDB(Variant("foo"))["S"] == Json("foo")); 320 assert(variantToDDB(Variant(3))["N"] == Json("3")); 321 assert(variantToDDB(Variant(3.14))["N"] == Json("3.14")); 322 assert(variantToDDB(Variant(true))["BOOL"] == Json("true")); 323 assert(variantToDDB(Variant(stringSet("a", "b")))["SS"] == Json([Json("a"), Json("b")])); 324 //assert(variantToDDB(Variant(["a", "b"]))["SS"] == Json([Json("a"), Json("b")])); 325 } 326 327 private Variant toVariant(T)(T value) 328 { 329 return Variant(value); 330 } 331 332 private Variant toVariant(Variant v) 333 { 334 return v; 335 } 336 337 /** 338 A DynamoDB item 339 */ 340 struct Item 341 { 342 Variant[string] attrs; 343 344 ref Item set(T)(string key, T value) if (!isAssociativeArray!T) 345 { 346 attrs[key] = toVariant(value); 347 return this; 348 } 349 350 inout(Variant)* opBinaryRight(string op)(string other) inout if (op == "in") 351 { 352 return other in attrs; 353 } 354 355 ref Item unset(string key) 356 { 357 attrs.remove(key); 358 return this; 359 } 360 361 ref Item set(T)(string key, T[string] values) 362 { 363 Variant[string] obj; 364 foreach (k; values.byKey) 365 { 366 obj[k] = toVariant(values[k]); 367 } 368 attrs[key] = obj; 369 370 return this; 371 } 372 373 /** 374 Convert the item to a DDB JSON representation 375 */ 376 @property Json ddbJson() const 377 { 378 auto jsonItem = Json.emptyObject; 379 foreach (a; attrs.byKey()) 380 { 381 jsonItem[a] = variantToDDB(attrs[a]); 382 } 383 return jsonItem; 384 } 385 386 Variant opIndex(string key) 387 { 388 return attrs[key]; 389 } 390 391 void opIndexAssign(T)(string key, T value) 392 { 393 attrs[key] = value; 394 } 395 } 396 397 unittest 398 { 399 Item u; 400 u.set("set", stringSet()); 401 u["set"].get!StringSet.stableInsert("hoi"); 402 assert("hoi" in u["set"].get!StringSet); 403 }