diff --git a/src/main/java/fr/xephi/authme/AuthMe.java b/src/main/java/fr/xephi/authme/AuthMe.java index 78620307..7c722b22 100644 --- a/src/main/java/fr/xephi/authme/AuthMe.java +++ b/src/main/java/fr/xephi/authme/AuthMe.java @@ -333,7 +333,7 @@ public class AuthMe extends JavaPlugin { } // Wait for tasks and close data source - new TaskCloser(this, database).run(); + new TaskCloser(this, database, bukkitService).run(); // Disabled correctly ConsoleLogger.info("AuthMe " + this.getDescription().getVersion() + " disabled!"); diff --git a/src/main/java/fr/xephi/authme/initialization/TaskCloser.java b/src/main/java/fr/xephi/authme/initialization/TaskCloser.java index c1126975..105e4dbd 100644 --- a/src/main/java/fr/xephi/authme/initialization/TaskCloser.java +++ b/src/main/java/fr/xephi/authme/initialization/TaskCloser.java @@ -3,6 +3,7 @@ package fr.xephi.authme.initialization; import com.google.common.annotations.VisibleForTesting; import fr.xephi.authme.AuthMe; import fr.xephi.authme.datasource.DataSource; +import fr.xephi.authme.service.BukkitService; import org.bukkit.scheduler.BukkitScheduler; import org.bukkit.scheduler.BukkitWorker; @@ -22,22 +23,27 @@ public class TaskCloser implements Runnable { private final Logger logger; private final AuthMe plugin; private final DataSource dataSource; + private final BukkitService bukkitService; /** * Constructor. * * @param plugin the plugin instance * @param dataSource the data source (nullable) + * @param bukkitService the bukkit service instance (nullable) */ - public TaskCloser(AuthMe plugin, DataSource dataSource) { + public TaskCloser(AuthMe plugin, DataSource dataSource, BukkitService bukkitService) { this.scheduler = plugin.getServer().getScheduler(); this.logger = plugin.getLogger(); this.plugin = plugin; this.dataSource = dataSource; + this.bukkitService = bukkitService; } @Override public void run() { + logger.info("Closing scheduled tasks:"); + List pendingTasks = getPendingTasks(); logger.log(Level.INFO, "Waiting for {0} tasks to finish", pendingTasks.size()); int progress = 0; @@ -69,6 +75,16 @@ public class TaskCloser implements Runnable { tries--; } + logger.info("Closing async tasks..."); + if(bukkitService != null) { + try { + bukkitService.closeAsyncPool(); + } catch (InterruptedException e) { + logger.log(Level.WARNING, "Unable to close some async task", e); + } + } + + logger.info("Closing datasource..."); if (dataSource != null) { dataSource.close(); } diff --git a/src/main/java/fr/xephi/authme/service/BukkitService.java b/src/main/java/fr/xephi/authme/service/BukkitService.java index 20aa23c2..188b7c42 100644 --- a/src/main/java/fr/xephi/authme/service/BukkitService.java +++ b/src/main/java/fr/xephi/authme/service/BukkitService.java @@ -5,6 +5,7 @@ import fr.xephi.authme.ConsoleLogger; import fr.xephi.authme.initialization.SettingsDependent; import fr.xephi.authme.settings.Settings; import fr.xephi.authme.settings.properties.PluginSettings; +import fr.xephi.authme.util.Utils; import org.bukkit.BanEntry; import org.bukkit.BanList; import org.bukkit.Bukkit; @@ -25,6 +26,10 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Service for operations requiring the Bukkit API, such as for scheduling. @@ -41,13 +46,35 @@ public class BukkitService implements SettingsDependent { private Method getOnlinePlayers; private boolean useAsyncTasks; + // Async executor + private ThreadPoolExecutor asyncExecutor; + @Inject BukkitService(AuthMe authMe, Settings settings) { this.authMe = authMe; getOnlinePlayersIsCollection = initializeOnlinePlayersIsCollectionField(); + + int coreCount = Utils.getCoreCount(); + // Keep 1 free core for the main thread and the OS + if(coreCount != 1) { + coreCount--; + } + asyncExecutor = new ThreadPoolExecutor(coreCount, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, + new SynchronousQueue<>()); + reload(settings); } + /** + * Wait the shutdown of the async execution pool. + * + * @throws InterruptedException if the shutdown is interrupted + */ + public void closeAsyncPool() throws InterruptedException { + asyncExecutor.shutdown(); + asyncExecutor.awaitTermination(30, TimeUnit.SECONDS); + } + /** * Schedules a once off task to occur as soon as possible. *

@@ -135,12 +162,12 @@ public class BukkitService implements SettingsDependent { * Returns a task that will run asynchronously. * * @param task the task to be run - * @return a BukkitTask that contains the id number * @throws IllegalArgumentException if plugin is null * @throws IllegalArgumentException if task is null */ - public BukkitTask runTaskAsynchronously(Runnable task) { - return Bukkit.getScheduler().runTaskAsynchronously(authMe, task); + public void runTaskAsynchronously(Runnable task) { + asyncExecutor.execute(task); + //Bukkit.getScheduler().runTaskAsynchronously(authMe, task); } /** diff --git a/src/test/java/fr/xephi/authme/initialization/TaskCloserTest.java b/src/test/java/fr/xephi/authme/initialization/TaskCloserTest.java index 7f677e0c..a76aedd6 100644 --- a/src/test/java/fr/xephi/authme/initialization/TaskCloserTest.java +++ b/src/test/java/fr/xephi/authme/initialization/TaskCloserTest.java @@ -3,6 +3,7 @@ package fr.xephi.authme.initialization; import fr.xephi.authme.AuthMe; import fr.xephi.authme.ReflectionTestUtils; import fr.xephi.authme.datasource.DataSource; +import fr.xephi.authme.service.BukkitService; import org.bukkit.Server; import org.bukkit.plugin.Plugin; import org.bukkit.plugin.PluginLogger; @@ -49,6 +50,8 @@ public class TaskCloserTest { private BukkitScheduler bukkitScheduler; @Mock private DataSource dataSource; + @Mock + private BukkitService bukkitService; @Before public void initAuthMe() { @@ -56,7 +59,7 @@ public class TaskCloserTest { given(server.getScheduler()).willReturn(bukkitScheduler); ReflectionTestUtils.setField(JavaPlugin.class, authMe, "server", server); ReflectionTestUtils.setField(JavaPlugin.class, authMe, "logger", logger); - taskCloser = spy(new TaskCloser(authMe, dataSource)); + taskCloser = spy(new TaskCloser(authMe, dataSource, bukkitService)); } @Test @@ -120,7 +123,7 @@ public class TaskCloserTest { /** Test implementation for {@link #shouldStopForInterruptedThread()}. */ private void shouldStopForInterruptedThread0() throws InterruptedException { // given - taskCloser = spy(new TaskCloser(authMe, null)); + taskCloser = spy(new TaskCloser(authMe, null, bukkitService)); // First two times do nothing, third time throw exception when we sleep doNothing().doNothing().doThrow(InterruptedException.class).when(taskCloser).sleep(); mockActiveWorkers();