1. Gambaran keseluruhan
Dalam tutorial ringkas ini, kita akan meneroka asas-asas semaphores dan mutexes di Java.
2. Semaphore
Kita akan mulakan dengan java.util.concurrent.Semaphore. Kita boleh menggunakan semaphores untuk mengehadkan bilangan utas serentak yang mengakses sumber tertentu.
Dalam contoh berikut, kami akan mengimplementasikan barisan masuk mudah untuk menghadkan bilangan pengguna dalam sistem:
class LoginQueueUsingSemaphore { private Semaphore semaphore; public LoginQueueUsingSemaphore(int slotLimit) { semaphore = new Semaphore(slotLimit); } boolean tryLogin() { return semaphore.tryAcquire(); } void logout() { semaphore.release(); } int availableSlots() { return semaphore.availablePermits(); } }
Perhatikan bagaimana kami menggunakan kaedah berikut:
- tryAcquire () - kembali benar jika izin tersedia dengan segera dan dapatkannya sebaliknya akan kembali palsu, tetapi memperoleh () memperoleh izin dan menyekat sehingga ada yang tersedia
- lepaskan () - lepaskan permit
- availablePermits () - mengembalikan jumlah permit semasa yang ada
Untuk menguji barisan masuk kami, pertama-tama kami akan berusaha mencapai had dan memeriksa apakah percubaan log masuk seterusnya akan disekat:
@Test public void givenLoginQueue_whenReachLimit_thenBlocked() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); assertFalse(loginQueue.tryLogin()); }
Seterusnya, kita akan melihat apakah ada slot yang tersedia selepas logout:
@Test public void givenLoginQueue_whenLogout_thenSlotsAvailable() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); loginQueue.logout(); assertTrue(loginQueue.availableSlots() > 0); assertTrue(loginQueue.tryLogin()); }
3. Semaphore Berwaktu
Seterusnya, kita akan membincangkan Apache Commons TimedSemaphore. TimedSemaphore membenarkan sebilangan izin sebagai Semaphore sederhana tetapi dalam jangka masa tertentu, setelah tempoh ini tetapan semula waktu dan semua izin dilepaskan.
Kita boleh menggunakan TimedSemaphore untuk membina barisan tunda sederhana seperti berikut:
class DelayQueueUsingTimedSemaphore { private TimedSemaphore semaphore; DelayQueueUsingTimedSemaphore(long period, int slotLimit) { semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit); } boolean tryAdd() { return semaphore.tryAcquire(); } int availableSlots() { return semaphore.getAvailablePermits(); } }
Apabila kita menggunakan barisan tunda dengan satu saat sebagai jangka masa dan setelah menggunakan semua slot dalam satu saat, tidak ada yang tersedia:
public void givenDelayQueue_whenReachLimit_thenBlocked() { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); assertFalse(delayQueue.tryAdd()); }
Tetapi setelah tidur untuk beberapa waktu, semaphore harus menetapkan semula dan melepaskan izin :
@Test public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); Thread.sleep(1000); assertTrue(delayQueue.availableSlots() > 0); assertTrue(delayQueue.tryAdd()); }
4. Semaphore vs Mutex
Mutex bertindak serupa dengan semaphore binari, kita boleh menggunakannya untuk melaksanakan pengecualian bersama.
Dalam contoh berikut, kami akan menggunakan semaphore binari sederhana untuk membina pembilang:
class CounterUsingMutex { private Semaphore mutex; private int count; CounterUsingMutex() { mutex = new Semaphore(1); count = 0; } void increase() throws InterruptedException { mutex.acquire(); this.count = this.count + 1; Thread.sleep(1000); mutex.release(); } int getCount() { return this.count; } boolean hasQueuedThreads() { return mutex.hasQueuedThreads(); } }
Apabila banyak utas cuba masuk ke kaunter sekaligus, mereka akan disekat dalam barisan :
@Test public void whenMutexAndMultipleThreads_thenBlocked() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); }
Semasa kami menunggu, semua utas akan memasuki kaunter dan tidak ada utas yang tersisa dalam barisan:
@Test public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); Thread.sleep(5000); assertFalse(counter.hasQueuedThreads()); assertEquals(count, counter.getCount()); }
5. Kesimpulan
Dalam artikel ini, kami meneroka asas-asas semaphores di Jawa.
Seperti biasa, kod sumber penuh tersedia di GitHub.