Vert-x-通过异步的方式使用JDBC连接SQL

jopen 9年前

在这篇文章中,我们将会看到怎样在vert.x应用中使用 HSQL ,当然也可以使用任意JDBC,以及使用vertx-jdbc-client提供的异步的API,这篇文章的代码在 github 上。

异步?

vert.x一个很重要的特点就是它的异步性。使用异步的API,不需要等结果返回,当有结果返回时,vert.x会主动通知。为了说明这个,我们来看一个简单的例子。

我们假设有个 add 方法。一般来说,会像 int r = add(1, 1) 这样来使用它。这是一个同步的API,所以你必须等到返回结果。异步的API会是这样: add(1, 1, r -> { /*do something with the result*/}) 。在这个版本中,你传入了一个Handler,当结果计算出来时才被调用。这个方法不返回任何东西,实现如下:

public void add(int a, int b, Handler<Integer> resultHandler) {      int r = a + b;      resultHandler.handle(r);  }

为了避免混淆概念,异步API并不是多线程。像我们在add例子里看到的,并没有涉及多线程。

异步JDBC

看了一些基本的异步的API,现在了解下 vertx-jdbc-client 。这个组件能够让我们通过 JDBC driver 与数据库交互。这些交互都是异步的,以前这样:

String sql = "SELECT * FROM Products";  ResultSet rs = stmt.executeQuery(sql);

现在要这样:

connection.query("SELECT * FROM Products", result -> {          // do something with the result  });

这个模型更高效,当结果出来后vert.x通知,避免了等待结果。

增加maven依赖

在 pom.xml 文件中增加两个 Maven dependencies

<dependency>    <groupId>io.vertx</groupId>    <artifactId>vertx-jdbc-client</artifactId>    <version>3.1.0</version>  </dependency>  <dependency>    <groupId>org.hsqldb</groupId>    <artifactId>hsqldb</artifactId>    <version>2.3.3</version>  </dependency>

第一个依赖提供了 vertx-jdbc-client ,第二个提供了 HSQL JDBC 的驱动。如果你想使用另外一个数据库,修改这个依赖,同时你还需要修改 JDBC url 和 JDBC driver 名。

初始化JDBC client

创建JDBC 客户端(client):

在 MyFirstVerticle 类中,声明一个新变量 JDBCClient jdbc ,并且在 start 方法中添加:

jdbc = JDBCClient.createShared(vertx, config(), "My-Whisky-Collection");

创建了一个JDBC client实例,使用verticle的配置文件配置JDBC client。这个配置文件需要提供下面的配置才能让JDBC client正常工作:

  • url-JDBC url,例如: jdbc:hsqldb:mem:db?shutdown=true
  • _driver class-JDBC的驱动,例如: org.hsqldb.jdbcDriver

有了client,接下来需要连接数据库。连接数据库是通过使用 jdbc.getConnection 来实现的, jdbc.getConnection 需要传入一个 Handler<AsyncResult<SQLConnection>> 参数。我们深入的了解下这个类型。首先,这是一个 Handler ,因此当结果准备好时它就会被调用。这个结果是 AsyncResult<SQLConnection> 的一个实例。 AsyncResult 是 vert.x 提供的一个结构,使用它能够知道连接数据库的操作是成功或失败了。如果成功了,它就会提供一个结果,这里结果是一个 SQLConnection 的实例。

当你接收一个 AsyncResult 的实例时,代码通常是:

if (ar.failed()) {    System.err.println("The operation has failed...: "        + ar.cause().getMessage());  } else {    // Use the result:    result = ar.result();   }

需要获取到 SQLConnection ,然后启动 rest 的应用。因为变成了异步的,这需要改变启动应用的方式。因此,如果将启动序列划分成多块:

startBackend(   (connection) -> createSomeData(connection,       (nothing) -> startWebApp(           (http) -> completeStartup(http, fut)       ), fut   ), fut);
  • startBackend - 获取 SQLConnection 对象,然后调用下一步
  • createSomeData - 初始化数据库并插入数据。当完成后,调用下一步
  • startWebApp - 启动web应用
  • completeStartup - 最后完成启动

fut 由vert.x传入,通知已经启动或者启动过程中遇到的问题。

startBackend 方法:

private void startBackend(Handler<AsyncResult<SQLConnection>> next, Future<Void> fut) {      jdbc.getConnection(ar -> {        if (ar.failed()) {          fut.fail(ar.cause());        } else {          next.handle(Future.succeededFuture(ar.result()));        }      });    }

这个方法获取了一个SQLConnection对象,检查操作是否完成。如果成功,会调用下一步。失败了,就会报告一个错误。其他的方法遵循同样的模式:

  • 检查上一步操作是否成功
  • 处理业务逻辑
  • 调用下一步

SQL

客户端已经准备好了,现在写SQL。从 createSomeData 方法开始,这个方法也是启动顺序中的一部分:

private void createSomeData(AsyncResult<SQLConnection> result,      Handler<AsyncResult<Void>> next, Future<Void> fut) {      if (result.failed()) {        fut.fail(result.cause());      } else {        SQLConnection connection = result.result();        connection.execute(            "CREATE TABLE IF NOT EXISTS Whisky (id INTEGER IDENTITY, name varchar(100), " +            "origin varchar(100))",            ar -> {              if (ar.failed()) {                fut.fail(ar.cause());                connection.close();                return;              }              connection.query("SELECT * FROM Whisky", select -> {                if (select.failed()) {                  fut.fail(ar.cause());                  connection.close();                  return;                }                if (select.result().getNumRows() == 0) {                  insert(                      new Whisky("Bowmore 15 Years Laimrig", "Scotland, Islay"),                      connection,                      (v) -> insert(new Whisky("Talisker 57° North", "Scotland, Island"),                          connection,                          (r) -> {                            next.handle(Future.<Void>succeededFuture());                            connection.close();                          }));                                                                    } else {                  next.handle(Future.<Void>succeededFuture());                  connection.close();                }              });            });      }    }

这个方法检查 SQLConnection 是否可用,然后执行一些SQL语句。首先,如果表不存在就创建表。看看下面代码:

connection.execute(      SQL statement,      handler called when the statement has been executed  )

handler 接收 AsyncResult<Void> ,例如:只有是通知而已,没有实际返回的结果。

关闭连接

操作完成后,别忘了关闭SQL链接。这个连接会被放入连接池并且可以被重复利用。

在这个 handler 的代码里,检查了 statement 是否正确的执行了,如果正确,我们接下来检查表是否含有数据,如果没有,将会使用 insert 方法插入数据:

private void insert(Whisky whisky, SQLConnection connection, Handler<AsyncResult<Whisky>> next) {    String sql = "INSERT INTO Whisky (name, origin) VALUES ?, ?";    connection.updateWithParams(sql,        new JsonArray().add(whisky.getName()).add(whisky.getOrigin()),        (ar) -> {          if (ar.failed()) {            next.handle(Future.failedFuture(ar.cause()));            return;          }          UpdateResult result = ar.result();          // Build a new whisky instance with the generated id.          Whisky w = new Whisky(result.getKeys().getInteger(0), whisky.getName(), whisky.getOrigin());          next.handle(Future.succeededFuture(w));        });  }

这个方法使用带有INSERT(插入)statement(声明)的 upateWithParams 方法,且传入了值。这个方法避免了 SQL 注入。一旦statement执行了(当数据库没有此条数据就会创建),就创建一个新的 Whisky 对象,自动生成ID。

带有数据库(SQL)的REST

上面的方法都是启动顺序的一部分。但是,关于调用REST API的方法又是怎么样的呢?以 getAll 方法为例。这个方法被web应用前端调用,并检索存储的所有的产品:

private void getAll(RoutingContext routingContext) {      jdbc.getConnection(ar -> {        SQLConnection connection = ar.result();        connection.query("SELECT * FROM Whisky", result -> {          List<Whisky> whiskies = result.result().getRows().stream().map(Whisky::new).collect(Collectors.toList());          routingContext.response()              .putHeader("content-type", "application/json; charset=utf-8")              .end(Json.encodePrettily(whiskies));          connection.close(); // Close the connection                });      });    }

这个方法获得了一个 SQLConnection 对象,然后发出一个查询。一旦获取到查询结果,它会像之前的方法一样写 HTTP response 。 getOne 、 deleteOne 、 updateOne 和 addOne 方法都是一样的。注意,在response之后,需要要关闭SQL连接。

看下传入到query方法的handler提供的结果。获取了一个包含了查询结果的ResultSet。每一行都是一个JsonObject,因此,如果你有一个数据对象使用JsonObject作为唯一的参数,那么创建这个对象很简单。

测试

需要小小的更新下测试程序,增加配置 JDBCClient 。在 MyFirstVerticleTest 类中,将 setUp 方法中创建的 DeploymentOption 对象修改成:

DeploymentOptions options = new DeploymentOptions()          .setConfig(new JsonObject()              .put("http.port", port)              .put("url", "jdbc:hsqldb:mem:test?shutdown=true")              .put("driver_class", "org.hsqldb.jdbcDriver")          );

除了 http.port ,还配置了 JDBC url 和 JDBC 驱动。测试时,使用的是一个内存数据库。在 src/test/resources/my-it-config.json 文件中也要做同样的修改。

{    "http.port": ${http.port},    "url": "jdbc:hsqldb:mem:it-test?shutdown=true",    "driver_class": "org.hsqldb.jdbcDriver"  }

src/main/conf/my-application-conf.json 文件也同样需要修改,这不是为了测试,而是为了运行这个应用:

{    "http.port" : 8082,    "url": "jdbc:hsqldb:file:db/whiskies",    "driver_class": "org.hsqldb.jdbcDriver"  }

这里这个 JDBC url 和上一个文件的有点不一样,因为需要将数据库存储到硬盘中。

展示时间!

开始构建程序:

mvn clean package

没有修改API(没有更改发布的java文件和REST接口),测试应该是可以顺利的运行的。

启动应用:

java -jar target/my-first-app-1.0-SNAPSHOT-fat.jar -conf src/main/conf/my-application-conf.json

访问 http://localhost:8082/assets/index.html ,然后,你可以看到这个应用使用的是数据库了。这一次,就算重启应用,这些数据仍然在,因为存储产品被持久化到硬盘里了。

总结

这篇文章中,知道了怎么在 vert.x 里使用 JDBC 数据库,并没有很多复杂的东西。开始可能会被这个异步的开发模型惊讶到,但是,一旦你开始使用了,你就很难再回去了。

下一次,我们将看到这个应用怎么使用mongoDB来替换HSQL。

Stay tuned, and happy coding !

来自: http://www.jianshu.com/p/6725ac7c2143