001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.jcs3.jcache.extras.writer; 020 021import javax.cache.Cache; 022import javax.cache.configuration.Factory; 023import javax.cache.integration.CacheWriter; 024import javax.cache.integration.CacheWriterException; 025import java.io.Closeable; 026import java.io.IOException; 027import java.util.Collection; 028import java.util.List; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.ThreadFactory; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.logging.Level; 034import java.util.logging.Logger; 035 036public class AsyncCacheWriter<K, V> implements CacheWriter<K, V>, Closeable, Factory<CacheWriter<K, V>> 037{ 038 private static final Logger LOGGER = Logger.getLogger(AsyncCacheWriter.class.getName()); 039 040 private final CacheWriter<K, V> writer; 041 private final ExecutorService pool; 042 043 public AsyncCacheWriter(final CacheWriter<K, V> delegate, final int poolSize) 044 { 045 writer = delegate; 046 pool = Executors.newFixedThreadPool( 047 poolSize, new DaemonThreadFactory(delegate.getClass().getName() + "-" + delegate.hashCode() + "-")); 048 } 049 050 @Override 051 public void write(final Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException 052 { 053 pool.submit(new ExceptionProtectionRunnable() 054 { 055 @Override 056 public void doRun() 057 { 058 writer.write(entry); 059 } 060 }); 061 } 062 063 @Override 064 public void writeAll(final Collection<Cache.Entry<? extends K, ? extends V>> entries) throws CacheWriterException 065 { 066 pool.submit(new ExceptionProtectionRunnable() 067 { 068 @Override 069 public void doRun() 070 { 071 writer.writeAll(entries); 072 } 073 }); 074 } 075 076 @Override 077 public void delete(final Object key) throws CacheWriterException 078 { 079 pool.submit(new ExceptionProtectionRunnable() 080 { 081 @Override 082 public void doRun() 083 { 084 writer.delete(key); 085 } 086 }); 087 } 088 089 @Override 090 public void deleteAll(final Collection<?> keys) throws CacheWriterException 091 { 092 pool.submit(new ExceptionProtectionRunnable() 093 { 094 @Override 095 public void doRun() 096 { 097 writer.deleteAll(keys); 098 } 099 }); 100 } 101 102 @Override 103 public void close() throws IOException 104 { 105 final List<Runnable> runnables = pool.shutdownNow(); 106 for (final Runnable r : runnables) 107 { 108 r.run(); 109 } 110 } 111 112 @Override 113 public CacheWriter<K, V> create() 114 { 115 return this; 116 } 117 118 // avoid dep on impl 119 private static class DaemonThreadFactory implements ThreadFactory 120 { 121 private final AtomicInteger index = new AtomicInteger(1); 122 private final String prefix; 123 124 public DaemonThreadFactory(final String prefix) 125 { 126 this.prefix = prefix; 127 } 128 129 @Override 130 public Thread newThread( final Runnable runner ) 131 { 132 final Thread t = new Thread( runner ); 133 t.setName(prefix + index.getAndIncrement()); 134 t.setDaemon(true); 135 return t; 136 } 137 } 138 139 private static abstract class ExceptionProtectionRunnable implements Runnable 140 { 141 @Override 142 public void run() 143 { 144 try 145 { 146 doRun(); 147 } 148 catch (final Exception e) 149 { 150 LOGGER.log(Level.SEVERE, e.getMessage(), e); 151 } 152 } 153 154 protected abstract void doRun(); 155 } 156}