Use custom async thread execution pool
The new pool has a limited thread size (cpu core count - 1) and should avoid bot attacks to crash the machine.
This commit is contained in:
parent
72853a382f
commit
a574245bb9
@ -333,7 +333,7 @@ public class AuthMe extends JavaPlugin {
|
||||
}
|
||||
|
||||
// Wait for tasks and close data source
|
||||
new TaskCloser(this, database).run();
|
||||
new TaskCloser(this, database, bukkitService).run();
|
||||
|
||||
// Disabled correctly
|
||||
ConsoleLogger.info("AuthMe " + this.getDescription().getVersion() + " disabled!");
|
||||
|
||||
@ -3,6 +3,7 @@ package fr.xephi.authme.initialization;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import fr.xephi.authme.AuthMe;
|
||||
import fr.xephi.authme.datasource.DataSource;
|
||||
import fr.xephi.authme.service.BukkitService;
|
||||
import org.bukkit.scheduler.BukkitScheduler;
|
||||
import org.bukkit.scheduler.BukkitWorker;
|
||||
|
||||
@ -22,22 +23,27 @@ public class TaskCloser implements Runnable {
|
||||
private final Logger logger;
|
||||
private final AuthMe plugin;
|
||||
private final DataSource dataSource;
|
||||
private final BukkitService bukkitService;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param plugin the plugin instance
|
||||
* @param dataSource the data source (nullable)
|
||||
* @param bukkitService the bukkit service instance (nullable)
|
||||
*/
|
||||
public TaskCloser(AuthMe plugin, DataSource dataSource) {
|
||||
public TaskCloser(AuthMe plugin, DataSource dataSource, BukkitService bukkitService) {
|
||||
this.scheduler = plugin.getServer().getScheduler();
|
||||
this.logger = plugin.getLogger();
|
||||
this.plugin = plugin;
|
||||
this.dataSource = dataSource;
|
||||
this.bukkitService = bukkitService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
logger.info("Closing scheduled tasks:");
|
||||
|
||||
List<Integer> pendingTasks = getPendingTasks();
|
||||
logger.log(Level.INFO, "Waiting for {0} tasks to finish", pendingTasks.size());
|
||||
int progress = 0;
|
||||
@ -69,6 +75,16 @@ public class TaskCloser implements Runnable {
|
||||
tries--;
|
||||
}
|
||||
|
||||
logger.info("Closing async tasks...");
|
||||
if(bukkitService != null) {
|
||||
try {
|
||||
bukkitService.closeAsyncPool();
|
||||
} catch (InterruptedException e) {
|
||||
logger.log(Level.WARNING, "Unable to close some async task", e);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("Closing datasource...");
|
||||
if (dataSource != null) {
|
||||
dataSource.close();
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ import fr.xephi.authme.ConsoleLogger;
|
||||
import fr.xephi.authme.initialization.SettingsDependent;
|
||||
import fr.xephi.authme.settings.Settings;
|
||||
import fr.xephi.authme.settings.properties.PluginSettings;
|
||||
import fr.xephi.authme.util.Utils;
|
||||
import org.bukkit.BanEntry;
|
||||
import org.bukkit.BanList;
|
||||
import org.bukkit.Bukkit;
|
||||
@ -25,6 +26,10 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Service for operations requiring the Bukkit API, such as for scheduling.
|
||||
@ -41,13 +46,35 @@ public class BukkitService implements SettingsDependent {
|
||||
private Method getOnlinePlayers;
|
||||
private boolean useAsyncTasks;
|
||||
|
||||
// Async executor
|
||||
private ThreadPoolExecutor asyncExecutor;
|
||||
|
||||
@Inject
|
||||
BukkitService(AuthMe authMe, Settings settings) {
|
||||
this.authMe = authMe;
|
||||
getOnlinePlayersIsCollection = initializeOnlinePlayersIsCollectionField();
|
||||
|
||||
int coreCount = Utils.getCoreCount();
|
||||
// Keep 1 free core for the main thread and the OS
|
||||
if(coreCount != 1) {
|
||||
coreCount--;
|
||||
}
|
||||
asyncExecutor = new ThreadPoolExecutor(coreCount, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<>());
|
||||
|
||||
reload(settings);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait the shutdown of the async execution pool.
|
||||
*
|
||||
* @throws InterruptedException if the shutdown is interrupted
|
||||
*/
|
||||
public void closeAsyncPool() throws InterruptedException {
|
||||
asyncExecutor.shutdown();
|
||||
asyncExecutor.awaitTermination(30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a once off task to occur as soon as possible.
|
||||
* <p>
|
||||
@ -135,12 +162,12 @@ public class BukkitService implements SettingsDependent {
|
||||
* Returns a task that will run asynchronously.
|
||||
*
|
||||
* @param task the task to be run
|
||||
* @return a BukkitTask that contains the id number
|
||||
* @throws IllegalArgumentException if plugin is null
|
||||
* @throws IllegalArgumentException if task is null
|
||||
*/
|
||||
public BukkitTask runTaskAsynchronously(Runnable task) {
|
||||
return Bukkit.getScheduler().runTaskAsynchronously(authMe, task);
|
||||
public void runTaskAsynchronously(Runnable task) {
|
||||
asyncExecutor.execute(task);
|
||||
//Bukkit.getScheduler().runTaskAsynchronously(authMe, task);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -3,6 +3,7 @@ package fr.xephi.authme.initialization;
|
||||
import fr.xephi.authme.AuthMe;
|
||||
import fr.xephi.authme.ReflectionTestUtils;
|
||||
import fr.xephi.authme.datasource.DataSource;
|
||||
import fr.xephi.authme.service.BukkitService;
|
||||
import org.bukkit.Server;
|
||||
import org.bukkit.plugin.Plugin;
|
||||
import org.bukkit.plugin.PluginLogger;
|
||||
@ -49,6 +50,8 @@ public class TaskCloserTest {
|
||||
private BukkitScheduler bukkitScheduler;
|
||||
@Mock
|
||||
private DataSource dataSource;
|
||||
@Mock
|
||||
private BukkitService bukkitService;
|
||||
|
||||
@Before
|
||||
public void initAuthMe() {
|
||||
@ -56,7 +59,7 @@ public class TaskCloserTest {
|
||||
given(server.getScheduler()).willReturn(bukkitScheduler);
|
||||
ReflectionTestUtils.setField(JavaPlugin.class, authMe, "server", server);
|
||||
ReflectionTestUtils.setField(JavaPlugin.class, authMe, "logger", logger);
|
||||
taskCloser = spy(new TaskCloser(authMe, dataSource));
|
||||
taskCloser = spy(new TaskCloser(authMe, dataSource, bukkitService));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -120,7 +123,7 @@ public class TaskCloserTest {
|
||||
/** Test implementation for {@link #shouldStopForInterruptedThread()}. */
|
||||
private void shouldStopForInterruptedThread0() throws InterruptedException {
|
||||
// given
|
||||
taskCloser = spy(new TaskCloser(authMe, null));
|
||||
taskCloser = spy(new TaskCloser(authMe, null, bukkitService));
|
||||
// First two times do nothing, third time throw exception when we sleep
|
||||
doNothing().doNothing().doThrow(InterruptedException.class).when(taskCloser).sleep();
|
||||
mockActiveWorkers();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user