001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.dbutils; 018 019import java.sql.Connection; 020import java.sql.PreparedStatement; 021import java.sql.ResultSet; 022import java.sql.SQLException; 023import java.util.concurrent.Callable; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.Future; 026 027import javax.sql.DataSource; 028 029/** 030 * Executes SQL queries with pluggable strategies for handling 031 * {@code ResultSet}s. This class is thread safe. 032 * 033 * @see ResultSetHandler 034 * @since 1.4 035 */ 036public class AsyncQueryRunner extends AbstractQueryRunner { 037 038 /** 039 * @deprecated No longer used by this class. Will be removed in a future version. 040 * Class that encapsulates the continuation for batch calls. 041 */ 042 @Deprecated 043 protected class BatchCallableStatement implements Callable<int[]> { 044 private final String sql; 045 private final Object[][] params; 046 private final Connection conn; 047 private final boolean closeConn; 048 private final PreparedStatement ps; 049 050 /** 051 * Creates a new BatchCallableStatement instance. 052 * 053 * @param sql The SQL statement to execute. 054 * @param params An array of query replacement parameters. Each row in 055 * this array is one set of batch replacement values. 056 * @param conn The connection to use for the batch call. 057 * @param closeConn True if the connection should be closed, false otherwise. 058 * @param ps The {@link PreparedStatement} to be executed. 059 */ 060 public BatchCallableStatement(final String sql, final Object[][] params, final Connection conn, final boolean closeConn, final PreparedStatement ps) { 061 this.sql = sql; 062 this.params = params.clone(); 063 this.conn = conn; 064 this.closeConn = closeConn; 065 this.ps = ps; 066 } 067 068 /** 069 * The actual call to executeBatch. 070 * 071 * @return an array of update counts containing one element for each command in the batch. 072 * @throws SQLException if a database access error occurs or one of the commands sent to the database fails. 073 * @see PreparedStatement#executeBatch() 074 */ 075 @Override 076 public int[] call() throws SQLException { 077 int[] ret = null; 078 079 try { 080 ret = ps.executeBatch(); 081 } catch (final SQLException e) { 082 rethrow(e, sql, (Object[])params); 083 } finally { 084 close(ps); 085 if (closeConn) { 086 close(conn); 087 } 088 } 089 090 return ret; 091 } 092 } 093 /** 094 * Class that encapsulates the continuation for query calls. 095 * @param <T> The type of the result from the call to handle. 096 */ 097 protected class QueryCallableStatement<T> implements Callable<T> { 098 private final String sql; 099 private final Object[] params; 100 private final Connection conn; 101 private final boolean closeConn; 102 private final PreparedStatement ps; 103 private final ResultSetHandler<T> rsh; 104 105 /** 106 * Creates a new {@code QueryCallableStatement} instance. 107 * 108 * @param conn The connection to use for the batch call. 109 * @param closeConn True if the connection should be closed, false otherwise. 110 * @param ps The {@link PreparedStatement} to be executed. 111 * @param rsh The handler that converts the results into an object. 112 * @param sql The SQL statement to execute. 113 * @param params An array of query replacement parameters. Each row in 114 * this array is one set of batch replacement values. 115 */ 116 public QueryCallableStatement(final Connection conn, final boolean closeConn, final PreparedStatement ps, 117 final ResultSetHandler<T> rsh, final String sql, final Object... params) { 118 this.sql = sql; 119 this.params = params; 120 this.conn = conn; 121 this.closeConn = closeConn; 122 this.ps = ps; 123 this.rsh = rsh; 124 } 125 126 /** 127 * The actual call to {@code handle()} method. 128 * 129 * @return an array of update counts containing one element for each command in the batch. 130 * @throws SQLException if a database access error occurs. 131 * @see ResultSetHandler#handle(ResultSet) 132 */ 133 @Override 134 public T call() throws SQLException { 135 ResultSet resultSet = null; 136 T ret = null; 137 138 try { 139 resultSet = wrap(ps.executeQuery()); 140 ret = rsh.handle(resultSet); 141 } catch (final SQLException e) { 142 rethrow(e, sql, params); 143 } finally { 144 try { 145 close(resultSet); 146 } finally { 147 close(ps); 148 if (closeConn) { 149 close(conn); 150 } 151 } 152 } 153 154 return ret; 155 } 156 157 } 158 159 /** 160 * @deprecated No longer used by this class. Will be removed in a future version. 161 * Class that encapsulates the continuation for update calls. 162 */ 163 @Deprecated 164 protected class UpdateCallableStatement implements Callable<Integer> { 165 private final String sql; 166 private final Object[] params; 167 private final Connection conn; 168 private final boolean closeConn; 169 private final PreparedStatement ps; 170 171 /** 172 * 173 * 174 * @param conn The connection to use for the batch call. 175 * @param closeConn True if the connection should be closed, false otherwise. 176 * @param ps The {@link PreparedStatement} to be executed. 177 * @param sql The SQL statement to execute. 178 * @param params An array of query replacement parameters. Each row in 179 * this array is one set of batch replacement values. 180 */ 181 public UpdateCallableStatement(final Connection conn, final boolean closeConn, final PreparedStatement ps, final String sql, final Object... params) { 182 this.sql = sql; 183 this.params = params; 184 this.conn = conn; 185 this.closeConn = closeConn; 186 this.ps = ps; 187 } 188 189 /** 190 * The actual call to {@code executeUpdate()} method. 191 * 192 * @return either (1) the row count for SQL Data Manipulation Language (DML) statements or 193 * (2) 0 for SQL statements that return nothing 194 * @throws SQLException if a database access error occurs. 195 * @see PreparedStatement#executeUpdate() 196 */ 197 @Override 198 public Integer call() throws SQLException { 199 int rows = 0; 200 201 try { 202 rows = ps.executeUpdate(); 203 } catch (final SQLException e) { 204 rethrow(e, sql, params); 205 } finally { 206 close(ps); 207 if (closeConn) { 208 close(conn); 209 } 210 } 211 212 return Integer.valueOf(rows); 213 } 214 215 } 216 217 private final ExecutorService executorService; 218 219 private final QueryRunner queryRunner; 220 221 /** 222 * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead. 223 * Constructor for AsyncQueryRunner that controls the use of {@code ParameterMetaData}. 224 * 225 * @param pmdKnownBroken Some drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) }; 226 * if {@code pmdKnownBroken} is set to true, we won't even try it; if false, we'll try it, 227 * and if it breaks, we'll remember not to use it again. 228 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 229 */ 230 @Deprecated 231 public AsyncQueryRunner(final boolean pmdKnownBroken, final ExecutorService executorService) { 232 this(null, pmdKnownBroken, executorService); 233 } 234 235 /** 236 * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead. 237 * Constructor for AsyncQueryRunner that take a {@code DataSource} and controls the use of {@code ParameterMetaData}. 238 * Methods that do not take a {@code Connection} parameter will retrieve connections from this 239 * {@code DataSource}. 240 * 241 * @param ds The {@code DataSource} to retrieve connections from. 242 * @param pmdKnownBroken Some drivers don't support {@link java.sql.ParameterMetaData#getParameterType(int) }; 243 * if {@code pmdKnownBroken} is set to true, we won't even try it; if false, we'll try it, 244 * and if it breaks, we'll remember not to use it again. 245 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 246 */ 247 @Deprecated 248 public AsyncQueryRunner(final DataSource ds, final boolean pmdKnownBroken, final ExecutorService executorService) { 249 super(ds, pmdKnownBroken); 250 this.executorService = executorService; 251 this.queryRunner = new QueryRunner(ds, pmdKnownBroken); 252 } 253 254 /** 255 * @deprecated Use {@link #AsyncQueryRunner(ExecutorService, QueryRunner)} instead. 256 * Constructor for AsyncQueryRunner that takes a {@code DataSource}. 257 * 258 * Methods that do not take a {@code Connection} parameter will retrieve connections from this 259 * {@code DataSource}. 260 * 261 * @param ds The {@code DataSource} to retrieve connections from. 262 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 263 */ 264 @Deprecated 265 public AsyncQueryRunner(final DataSource ds, final ExecutorService executorService) { 266 this(ds, false, executorService); 267 } 268 269 /** 270 * Constructor for AsyncQueryRunner. 271 * 272 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 273 */ 274 public AsyncQueryRunner(final ExecutorService executorService) { 275 this(null, false, executorService); 276 } 277 278 /** 279 * Constructor for AsyncQueryRunner which uses a provided ExecutorService and underlying QueryRunner. 280 * 281 * @param executorService the {@code ExecutorService} instance used to run JDBC invocations concurrently. 282 * @param queryRunner the {@code QueryRunner} instance to use for the queries. 283 * @since 1.5 284 */ 285 public AsyncQueryRunner(final ExecutorService executorService, final QueryRunner queryRunner) { 286 this.executorService = executorService; 287 this.queryRunner = queryRunner; 288 } 289 290 /** 291 * Execute a batch of SQL INSERT, UPDATE, or DELETE queries. 292 * 293 * @param conn The {@code Connection} to use to run the query. The caller is 294 * responsible for closing this Connection. 295 * @param sql The SQL to execute. 296 * @param params An array of query replacement parameters. Each row in 297 * this array is one set of batch replacement values. 298 * @return A {@code Future} which returns the number of rows updated per statement. 299 * @throws SQLException if a database access error occurs 300 */ 301 public Future<int[]> batch(final Connection conn, final String sql, final Object[][] params) throws SQLException { 302 return executorService.submit(() -> queryRunner.batch(conn, sql, params)); 303 } 304 305 /** 306 * Execute a batch of SQL INSERT, UPDATE, or DELETE queries. The 307 * {@code Connection} is retrieved from the {@code DataSource} 308 * set in the constructor. This {@code Connection} must be in 309 * auto-commit mode or the update will not be saved. 310 * 311 * @param sql The SQL to execute. 312 * @param params An array of query replacement parameters. Each row in 313 * this array is one set of batch replacement values. 314 * @return A {@code Future} which returns the number of rows updated per statement. 315 * @throws SQLException if a database access error occurs 316 */ 317 public Future<int[]> batch(final String sql, final Object[][] params) throws SQLException { 318 return executorService.submit(() -> queryRunner.batch(sql, params)); 319 } 320 321 /** 322 * Executes {@link QueryRunner#insert(Connection, String, ResultSetHandler)} asynchronously. 323 * 324 * @param <T> Return type expected 325 * @param conn {@link Connection} to use to execute the SQL statement 326 * @param sql SQL insert statement to execute 327 * @param rsh {@link ResultSetHandler} for handling the results 328 * @return {@link Future} that executes a query runner insert 329 * @see QueryRunner#insert(Connection, String, ResultSetHandler) 330 * @throws SQLException if a database access error occurs 331 * @since 1.6 332 */ 333 public <T> Future<T> insert(final Connection conn, final String sql, final ResultSetHandler<T> rsh) throws SQLException { 334 return executorService.submit(() -> queryRunner.insert(conn, sql, rsh)); 335 } 336 337 /** 338 * Executes {@link QueryRunner#insert(Connection, String, ResultSetHandler, Object...)} asynchronously. 339 * 340 * @param <T> Return type expected 341 * @param conn {@link Connection} to use to execute the SQL statement 342 * @param sql SQL insert statement to execute 343 * @param rsh {@link ResultSetHandler} for handling the results 344 * @param params Parameter values for substitution in the SQL statement 345 * @return {@link Future} that executes a query runner insert 346 * @see QueryRunner#insert(Connection, String, ResultSetHandler, Object...) 347 * @throws SQLException if a database access error occurs 348 * @since 1.6 349 */ 350 public <T> Future<T> insert(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException { 351 return executorService.submit(() -> queryRunner.insert(conn, sql, rsh, params)); 352 } 353 354 /** 355 * Executes {@link QueryRunner#insert(String, ResultSetHandler)} asynchronously. 356 * 357 * @param <T> Return type expected 358 * @param sql SQL insert statement to execute 359 * @param rsh {@link ResultSetHandler} for handling the results 360 * @return {@link Future} that executes a query runner insert 361 * @see QueryRunner#insert(String, ResultSetHandler) 362 * @throws SQLException if a database access error occurs 363 * @since 1.6 364 */ 365 public <T> Future<T> insert(final String sql, final ResultSetHandler<T> rsh) throws SQLException { 366 return executorService.submit(() -> queryRunner.insert(sql, rsh)); 367 } 368 369 /** 370 * Executes {@link QueryRunner#insert(String, ResultSetHandler, Object...)} asynchronously. 371 * 372 * @param <T> Return type expected 373 * @param sql SQL insert statement to execute 374 * @param rsh {@link ResultSetHandler} for handling the results 375 * @param params Parameter values for substitution in the SQL statement 376 * @return {@link Future} that executes a query runner insert 377 * @see QueryRunner#insert(String, ResultSetHandler, Object...) 378 * @throws SQLException if a database access error occurs 379 * @since 1.6 380 */ 381 public <T> Future<T> insert(final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException { 382 return executorService.submit(() -> queryRunner.insert(sql, rsh, params)); 383 } 384 385 /** 386 * {@link QueryRunner#insertBatch(Connection, String, ResultSetHandler, Object[][])} asynchronously. 387 * 388 * @param <T> Return type expected 389 * @param conn {@link Connection} to use to execute the SQL statement 390 * @param sql SQL insert statement to execute 391 * @param rsh {@link ResultSetHandler} for handling the results 392 * @param params An array of query replacement parameters. Each row in 393 * this array is one set of batch replacement values. 394 * @return {@link Future} that executes a query runner batch insert 395 * @see QueryRunner#insertBatch(Connection, String, ResultSetHandler, Object[][]) 396 * @throws SQLException if a database access error occurs 397 * @since 1.6 398 */ 399 public <T> Future<T> insertBatch(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object[][] params) throws SQLException { 400 return executorService.submit(() -> queryRunner.insertBatch(conn, sql, rsh, params)); 401 } 402 403 /** 404 * {@link QueryRunner#insertBatch(String, ResultSetHandler, Object[][])} asynchronously. 405 * 406 * @param <T> Return type expected 407 * @param sql SQL insert statement to execute 408 * @param rsh {@link ResultSetHandler} for handling the results 409 * @param params An array of query replacement parameters. Each row in 410 * this array is one set of batch replacement values. 411 * @return {@link Future} that executes a query runner batch insert 412 * @see QueryRunner#insertBatch(String, ResultSetHandler, Object[][]) 413 * @throws SQLException if a database access error occurs 414 * @since 1.6 415 */ 416 public <T> Future<T> insertBatch(final String sql, final ResultSetHandler<T> rsh, final Object[][] params) throws SQLException { 417 return executorService.submit(() -> queryRunner.insertBatch(sql, rsh, params)); 418 } 419 420 /** 421 * Execute an SQL SELECT query without any replacement parameters. The 422 * caller is responsible for closing the connection. 423 * @param <T> The type of object that the handler returns 424 * @param conn The connection to execute the query in. 425 * @param sql The query to execute. 426 * @param rsh The handler that converts the results into an object. 427 * @return A {@code Future} which returns the result of the query call. 428 * @throws SQLException if a database access error occurs 429 */ 430 public <T> Future<T> query(final Connection conn, final String sql, final ResultSetHandler<T> rsh) throws SQLException { 431 return executorService.submit(() -> queryRunner.query(conn, sql, rsh)); 432 } 433 434 /** 435 * Execute an SQL SELECT query with replacement parameters. The 436 * caller is responsible for closing the connection. 437 * @param <T> The type of object that the handler returns 438 * @param conn The connection to execute the query in. 439 * @param sql The query to execute. 440 * @param rsh The handler that converts the results into an object. 441 * @param params The replacement parameters. 442 * @return A {@code Future} which returns the result of the query call. 443 * @throws SQLException if a database access error occurs 444 */ 445 public <T> Future<T> query(final Connection conn, final String sql, final ResultSetHandler<T> rsh, final Object... params) 446 throws SQLException { 447 return executorService.submit(() -> queryRunner.query(conn, sql, rsh, params)); 448 } 449 450 /** 451 * Executes the given SELECT SQL without any replacement parameters. 452 * The {@code Connection} is retrieved from the 453 * {@code DataSource} set in the constructor. 454 * @param <T> The type of object that the handler returns 455 * @param sql The SQL statement to execute. 456 * @param rsh The handler used to create the result object from 457 * the {@code ResultSet}. 458 * 459 * @return A {@code Future} which returns the result of the query call. 460 * @throws SQLException if a database access error occurs 461 */ 462 public <T> Future<T> query(final String sql, final ResultSetHandler<T> rsh) throws SQLException { 463 return executorService.submit(() -> queryRunner.query(sql, rsh)); 464 } 465 466 /** 467 * Executes the given SELECT SQL query and returns a result object. 468 * The {@code Connection} is retrieved from the 469 * {@code DataSource} set in the constructor. 470 * @param <T> The type of object that the handler returns 471 * @param sql The SQL statement to execute. 472 * @param rsh The handler used to create the result object from 473 * the {@code ResultSet}. 474 * @param params Initialize the PreparedStatement's IN parameters with 475 * this array. 476 * @return A {@code Future} which returns the result of the query call. 477 * @throws SQLException if a database access error occurs 478 */ 479 public <T> Future<T> query(final String sql, final ResultSetHandler<T> rsh, final Object... params) throws SQLException { 480 return executorService.submit(() -> queryRunner.query(sql, rsh, params)); 481 } 482 483 /** 484 * Execute an SQL INSERT, UPDATE, or DELETE query without replacement 485 * parameters. 486 * 487 * @param conn The connection to use to run the query. 488 * @param sql The SQL to execute. 489 * @return A {@code Future} which returns the number of rows updated. 490 * @throws SQLException if a database access error occurs 491 */ 492 public Future<Integer> update(final Connection conn, final String sql) throws SQLException { 493 return executorService.submit(() -> Integer.valueOf(queryRunner.update(conn, sql))); 494 } 495 496 /** 497 * Execute an SQL INSERT, UPDATE, or DELETE query with a single replacement 498 * parameter. 499 * 500 * @param conn The connection to use to run the query. 501 * @param sql The SQL to execute. 502 * @param param The replacement parameter. 503 * @return A {@code Future} which returns the number of rows updated. 504 * @throws SQLException if a database access error occurs 505 */ 506 public Future<Integer> update(final Connection conn, final String sql, final Object param) throws SQLException { 507 return executorService.submit(() -> Integer.valueOf(queryRunner.update(conn, sql, param))); 508 } 509 510 /** 511 * Execute an SQL INSERT, UPDATE, or DELETE query. 512 * 513 * @param conn The connection to use to run the query. 514 * @param sql The SQL to execute. 515 * @param params The query replacement parameters. 516 * @return A {@code Future} which returns the number of rows updated. 517 * @throws SQLException if a database access error occurs 518 */ 519 public Future<Integer> update(final Connection conn, final String sql, final Object... params) throws SQLException { 520 return executorService.submit(() -> Integer.valueOf(queryRunner.update(conn, sql, params))); 521 } 522 523 /** 524 * Executes the given INSERT, UPDATE, or DELETE SQL statement without 525 * any replacement parameters. The {@code Connection} is retrieved 526 * from the {@code DataSource} set in the constructor. This 527 * {@code Connection} must be in auto-commit mode or the update will 528 * not be saved. 529 * 530 * @param sql The SQL statement to execute. 531 * @throws SQLException if a database access error occurs 532 * @return A {@code Future} which returns the number of rows updated. 533 */ 534 public Future<Integer> update(final String sql) throws SQLException { 535 return executorService.submit(() -> Integer.valueOf(queryRunner.update(sql))); 536 } 537 538 /** 539 * Executes the given INSERT, UPDATE, or DELETE SQL statement with 540 * a single replacement parameter. The {@code Connection} is 541 * retrieved from the {@code DataSource} set in the constructor. 542 * This {@code Connection} must be in auto-commit mode or the 543 * update will not be saved. 544 * 545 * @param sql The SQL statement to execute. 546 * @param param The replacement parameter. 547 * @throws SQLException if a database access error occurs 548 * @return A {@code Future} which returns the number of rows updated. 549 */ 550 public Future<Integer> update(final String sql, final Object param) throws SQLException { 551 return executorService.submit(() -> Integer.valueOf(queryRunner.update(sql, param))); 552 } 553 554 /** 555 * Executes the given INSERT, UPDATE, or DELETE SQL statement. The 556 * {@code Connection} is retrieved from the {@code DataSource} 557 * set in the constructor. This {@code Connection} must be in 558 * auto-commit mode or the update will not be saved. 559 * 560 * @param sql The SQL statement to execute. 561 * @param params Initializes the PreparedStatement's IN (i.e. '?') 562 * parameters. 563 * @throws SQLException if a database access error occurs 564 * @return A {@code Future} which returns the number of rows updated. 565 */ 566 public Future<Integer> update(final String sql, final Object... params) throws SQLException { 567 return executorService.submit(() -> Integer.valueOf(queryRunner.update(sql, params))); 568 } 569 570}