diff --git a/bundles/org.openhab.core.automation/src/main/java/org/openhab/core/automation/internal/TriggerHandlerCallbackImpl.java b/bundles/org.openhab.core.automation/src/main/java/org/openhab/core/automation/internal/TriggerHandlerCallbackImpl.java index c14bd20e8..455caa0cb 100644 --- a/bundles/org.openhab.core.automation/src/main/java/org/openhab/core/automation/internal/TriggerHandlerCallbackImpl.java +++ b/bundles/org.openhab.core.automation/src/main/java/org/openhab/core/automation/internal/TriggerHandlerCallbackImpl.java @@ -13,7 +13,6 @@ package org.openhab.core.automation.internal; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -23,7 +22,7 @@ import org.openhab.core.automation.RuleStatus; import org.openhab.core.automation.RuleStatusInfo; import org.openhab.core.automation.Trigger; import org.openhab.core.automation.handler.TriggerHandlerCallback; -import org.openhab.core.common.NamedThreadFactory; +import org.openhab.core.common.ThreadPoolManager; /** * This class is implementation of {@link TriggerHandlerCallback} used by the {@link Trigger}s to notify rule engine @@ -48,7 +47,7 @@ public class TriggerHandlerCallbackImpl implements TriggerHandlerCallback { protected TriggerHandlerCallbackImpl(RuleEngineImpl re, String ruleUID) { this.re = re; this.ruleUID = ruleUID; - executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("rule-" + ruleUID)); + this.executor = ThreadPoolManager.getPoolBasedSequentialScheduledExecutorService("rules", "rule-" + ruleUID); } @Override diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/PoolBasedSequentialScheduledExecutorService.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/PoolBasedSequentialScheduledExecutorService.java new file mode 100644 index 000000000..3b49427cf --- /dev/null +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/PoolBasedSequentialScheduledExecutorService.java @@ -0,0 +1,487 @@ +/** + * Copyright (c) 2010-2024 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.common; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; +import java.util.function.Function; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; + +/** + * A ScheduledExecutorService that will sequentially perform the tasks like a + * {@link Executors#newSingleThreadScheduledExecutor} backed by a thread pool. + * This is a drop in replacement to a ScheduledExecutorService with one thread to avoid a lot of threads created, idling + * most of the time and wasting memory on low-end devices. + * + * The mechanism to block the ScheduledExecutorService to run tasks concurrently is based on a chain of + * {@link CompletableFuture}s. + * Each instance has a reference to the last CompletableFuture and will call handleAsync to add a new task. + * + * @author Jörg Sautter - Initial contribution + */ +@NonNullByDefault +class PoolBasedSequentialScheduledExecutorService implements ScheduledExecutorService { + + private final WorkQueueEntry empty; + private final ScheduledThreadPoolExecutor pool; + private final List> scheduled; + private final ScheduledFuture cleaner; + private @Nullable WorkQueueEntry tail; + + public PoolBasedSequentialScheduledExecutorService(ScheduledThreadPoolExecutor pool) { + if (pool.getMaximumPoolSize() != Integer.MAX_VALUE) { + throw new IllegalArgumentException("the pool must scale unlimited to avoid potential dead locks!"); + } + + this.pool = pool; + + // prepare the WorkQueueEntry we are using when no tasks are pending + RunnableCompletableFuture future = new RunnableCompletableFuture<>(); + future.complete(null); + empty = new WorkQueueEntry(null, null, future); + + // tracks scheduled tasks alive + this.scheduled = new ArrayList<>(); + + tail = empty; + + // clean up to ensure we do not keep references to old tasks + cleaner = this.scheduleWithFixedDelay(() -> { + synchronized (this) { + scheduled.removeIf((sf) -> sf.isCancelled()); + + if (tail == null) { + // the service is shutdown + return; + } + + WorkQueueEntry entry = tail; + + while (entry.prev != null) { + if (entry.prev.future.isDone()) { + entry.prev = null; + break; + } + entry = entry.prev; + } + + if (tail != empty && tail.future.isDone()) { + // replace the tail with empty to ensure we do not prevent GC + tail = empty; + } + } + }, 2, 4, TimeUnit.SECONDS); + } + + @Override + public ScheduledFuture schedule(@Nullable Runnable command, long delay, @Nullable TimeUnit unit) { + return schedule((origin) -> pool.schedule(() -> { + // we block the thread here, in worst case new threads are spawned + submitToWorkQueue(origin.join(), command).join(); + }, delay, unit)); + } + + @Override + public ScheduledFuture schedule(@Nullable Callable callable, long delay, @Nullable TimeUnit unit) { + return schedule((origin) -> pool.schedule(() -> { + // we block the thread here, in worst case new threads are spawned + return submitToWorkQueue(origin.join(), callable).join(); + }, delay, unit)); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(@Nullable Runnable command, long initialDelay, long period, + @Nullable TimeUnit unit) { + return schedule((origin) -> pool.scheduleAtFixedRate(() -> { + CompletableFuture submitted; + + try { + // we block the thread here, in worst case new threads are spawned + submitted = submitToWorkQueue(origin.join(), command); + } catch (RejectedExecutionException ex) { + // the pool has been shutdown, scheduled tasks should cancel + return; + } + + submitted.join(); + }, initialDelay, period, unit)); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(@Nullable Runnable command, long initialDelay, long delay, + @Nullable TimeUnit unit) { + return schedule((origin) -> pool.scheduleWithFixedDelay(() -> { + CompletableFuture submitted; + + try { + // we block the thread here, in worst case new threads are spawned + submitted = submitToWorkQueue(origin.join(), command); + } catch (RejectedExecutionException ex) { + // the pool has been shutdown, scheduled tasks should cancel + return; + } + + submitted.join(); + }, initialDelay, delay, unit)); + } + + private ScheduledFuture schedule( + Function>, ScheduledFuture> doSchedule) { + synchronized (this) { + if (tail == null) { + throw new RejectedExecutionException("this scheduled executor has been shutdown before"); + } + + CompletableFuture> origin = new CompletableFuture<>(); + ScheduledFuture future = doSchedule.apply(origin); + + scheduled.add((RunnableFuture) future); + origin.complete((RunnableFuture) future); + + return future; + } + } + + @Override + public void shutdown() { + synchronized (this) { + cleaner.cancel(false); + scheduled.removeIf((sf) -> { + sf.cancel(false); + return true; + }); + tail = null; + } + } + + @Override + @NonNullByDefault({}) + public List shutdownNow() { + synchronized (this) { + if (tail == null) { + return List.of(); + } + + // ensures we do not leak the internal cleaner as Runnable + cleaner.cancel(false); + + Set<@Nullable Runnable> runnables = Collections.newSetFromMap(new IdentityHashMap<>()); + WorkQueueEntry entry = tail; + scheduled.removeIf((sf) -> { + if (sf.cancel(false)) { + runnables.add(sf); + } + return true; + }); + tail = null; + + while (entry != null) { + if (!entry.future.cancel(false)) { + break; + } + + if (entry.origin != null) { + // entry has been submitted by a .schedule call + runnables.add(entry.origin); + } else { + // entry has been submitted by a .submit call + runnables.add(entry.future); + } + entry = entry.prev; + } + + return List.copyOf(runnables); + } + } + + @Override + public boolean isShutdown() { + synchronized (this) { + return pool == null; + } + } + + @Override + public boolean isTerminated() { + synchronized (this) { + return pool == null && tail.future.isDone(); + } + } + + @Override + public boolean awaitTermination(long timeout, @Nullable TimeUnit unit) throws InterruptedException { + long timeoutAt = System.currentTimeMillis() + unit.toMillis(timeout); + + while (!isTerminated()) { + if (System.currentTimeMillis() > timeoutAt) { + return false; + } + + Thread.onSpinWait(); + } + + return true; + } + + @Override + public Future submit(@Nullable Callable task) { + return submitToWorkQueue(null, task); + } + + private CompletableFuture submitToWorkQueue(RunnableFuture origin, @Nullable Runnable task) { + Callable callable = () -> { + task.run(); + + return null; + }; + + return submitToWorkQueue(origin, callable); + } + + private CompletableFuture submitToWorkQueue(@Nullable RunnableFuture origin, @Nullable Callable task) { + BiFunction action = (result, error) -> { + // ignore result & error, they are from the previous task + try { + return task.call(); + } catch (RuntimeException ex) { + throw ex; + } catch (Exception ex) { + // a small hack to throw the Exception unchecked + throw PoolBasedSequentialScheduledExecutorService.unchecked(ex); + } + }; + + synchronized (this) { + if (tail == null) { + throw new RejectedExecutionException("this scheduled executor has been shutdown before"); + } + + RunnableCompletableFuture cf = tail.future.handleAsync(action, pool); + + cf.setCallable(task); + + tail = new WorkQueueEntry(tail, origin, cf); + + return cf; + } + } + + private static E unchecked(Exception ex) throws E { + throw (E) ex; + } + + @Override + public Future submit(@Nullable Runnable task, T result) { + return submitToWorkQueue(null, () -> { + task.run(); + + return result; + }); + } + + @Override + public Future submit(@Nullable Runnable task) { + return submit(task, (Void) null); + } + + @Override + @NonNullByDefault({}) + public List> invokeAll(@Nullable Collection> tasks) + throws InterruptedException { + List> futures = new ArrayList<>(); + + for (Callable task : tasks) { + futures.add(submit(task)); + } + + // wait for all futures to complete + for (Future future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + // ignore, we are just waiting here for the futures to complete + } + } + + return futures; + } + + @Override + @NonNullByDefault({}) + public List> invokeAll(@Nullable Collection> tasks, long timeout, + TimeUnit unit) throws InterruptedException { + List> futures = new ArrayList<>(); + + for (Callable task : tasks) { + futures.add(submitToWorkQueue(null, task).orTimeout(timeout, unit)); + } + + // wait for all futures to complete + for (Future future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + // ignore, we are just waiting here for the futures to complete + } + } + + return futures; + } + + @Override + public T invokeAny(@Nullable Collection> tasks) + throws InterruptedException, ExecutionException { + try { + return invokeAny(tasks, Long.MAX_VALUE); + } catch (TimeoutException ex) { + throw new AssertionError(ex); + } + } + + @Override + public T invokeAny(@Nullable Collection> tasks, long timeout, + @Nullable TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long timeoutAt = System.currentTimeMillis() + unit.toMillis(timeout); + + return invokeAny(tasks, timeoutAt); + } + + private T invokeAny(@Nullable Collection> tasks, long timeoutAt) + throws InterruptedException, ExecutionException, TimeoutException { + List> futures = new ArrayList<>(); + + for (Callable task : tasks) { + futures.add(submitToWorkQueue(null, task)); + } + + // wait for any future to complete + while (timeoutAt >= System.currentTimeMillis()) { + boolean allDone = true; + + for (CompletableFuture future : futures) { + if (future.isDone()) { + if (!future.isCompletedExceptionally()) { + // stop the others + for (CompletableFuture tooLate : futures) { + if (tooLate != future) { + tooLate.cancel(true); + } + } + + return future.join(); + } + } else { + allDone = false; + } + } + + if (allDone) { + ExecutionException exe = new ExecutionException("all tasks failed", null); + + for (CompletableFuture future : futures) { + try { + future.get(); + throw new AssertionError("all tasks should be failed"); + } catch (ExecutionException ex) { + exe.addSuppressed(ex); + } + } + + throw exe; + } + + Thread.onSpinWait(); + } + + for (CompletableFuture tooLate : futures) { + tooLate.cancel(true); + } + + throw new TimeoutException("none of the tasks did complete in time"); + } + + @Override + public void execute(Runnable command) { + submit(command); + } + + static class WorkQueueEntry { + private @Nullable WorkQueueEntry prev; + private @Nullable RunnableFuture origin; + private final RunnableCompletableFuture future; + + public WorkQueueEntry(@Nullable WorkQueueEntry prev, @Nullable RunnableFuture origin, + RunnableCompletableFuture future) { + this.prev = prev; + this.origin = origin; + this.future = future; + } + } + + static class RunnableCompletableFuture extends CompletableFuture implements RunnableFuture { + private @Nullable Callable callable; + + public RunnableCompletableFuture() { + callable = null; + } + + public void setCallable(@Nullable Callable callable) { + this.callable = callable; + } + + @Override + public RunnableCompletableFuture newIncompleteFuture() { + return new RunnableCompletableFuture<>(); + } + + @Override + public RunnableCompletableFuture handleAsync(BiFunction fn, + Executor executor) { + return (RunnableCompletableFuture) super.handleAsync(fn, executor); + } + + @Override + public void run() { + if (this.isDone()) { + // a FutureTask does also return here without exception + return; + } + + try { + this.complete(callable.call()); + } catch (Error | Exception t) { + this.completeExceptionally(t); + } + } + } +} diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java index 6c5e750ed..b23501f54 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java @@ -22,6 +22,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -115,6 +116,27 @@ public class ThreadPoolManager { } } + /** + * Returns an instance of a scheduled service, which will sequentially execute submitted tasks. If a task is + * currently running the task is queued until the previous one is completed, this also applies for scheduled tasks. + * The service might execute submitted task might in different threads, but still one after the other. + * If it is the first request for the given pool name and a pool is used, the instance is newly created. + * + * @param poolName a short name used to identify the pool, if a thread pool is used e.g. "bluetooth-discovery" + * @param threadName a short name used to identify the thread if no thread pool is used, e.g. "bluetooth" + * @return an instance to use + */ + public static ScheduledExecutorService getPoolBasedSequentialScheduledExecutorService(String poolName, + String threadName) { + if (configs.getOrDefault(poolName, 0) > 0) { + ScheduledThreadPoolExecutor pool = getScheduledPoolUnwrapped(poolName); + + return new PoolBasedSequentialScheduledExecutorService((ScheduledThreadPoolExecutor) pool); + } else { + return Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(threadName)); + } + } + /** * Returns an instance of a scheduled thread pool service. If it is the first request for the given pool name, the * instance is newly created. diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/events/EventHandler.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/events/EventHandler.java index 2a7d66cf4..6a5bf4d20 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/events/EventHandler.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/events/EventHandler.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.openhab.core.common.NamedThreadFactory; +import org.openhab.core.common.ThreadPoolManager; import org.openhab.core.events.Event; import org.openhab.core.events.EventFactory; import org.openhab.core.events.EventFilter; @@ -69,9 +70,8 @@ public class EventHandler implements AutoCloseable { } private synchronized ExecutorRecord createExecutorRecord(Class subscriber) { - return new ExecutorRecord( - Executors.newSingleThreadExecutor(new NamedThreadFactory("eventexecutor-" + executors.size())), - new AtomicInteger()); + return new ExecutorRecord(ThreadPoolManager.getPoolBasedSequentialScheduledExecutorService("events", + "eventexecutor-" + executors.size()), new AtomicInteger()); } @Override diff --git a/bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java b/bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java index 6e968dd82..d3b90550d 100644 --- a/bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java +++ b/bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java @@ -17,13 +17,20 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.*; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; /** @@ -35,6 +42,138 @@ import org.junit.jupiter.api.Test; @NonNullByDefault public class ThreadPoolManagerTest { + @BeforeAll + public static void enableSequentialScheduledExecutorService() { + ThreadPoolManager manager = new ThreadPoolManager(); + manager.activate(Map.of("unit-test", "10")); + } + + @Test + public void testSequentialExecutorServiceAssumptions() { + Callable callable = () -> true; + Runnable runnable = () -> { + }; + + ScheduledExecutorService service = Executors.newScheduledThreadPool(1); + + assertTrue(service.submit(runnable) instanceof RunnableFuture); + assertTrue(service.submit(callable) instanceof RunnableFuture); + + assertTrue(service.schedule(runnable, 1, TimeUnit.SECONDS) instanceof RunnableFuture); + assertTrue(service.schedule(callable, 1, TimeUnit.SECONDS) instanceof RunnableFuture); + + var fixedRate = service.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS); + + try { + assertTrue(fixedRate instanceof RunnableFuture); + } finally { + fixedRate.cancel(false); + } + + var fixedDelay = service.scheduleWithFixedDelay(runnable, 1, 1, TimeUnit.SECONDS); + + try { + assertTrue(fixedDelay instanceof RunnableFuture); + } finally { + fixedDelay.cancel(false); + } + + service.shutdownNow(); + } + + @Test + public void testExecutionIsSequentialInSequentialExecutorService() { + ScheduledExecutorService service = ThreadPoolManager.getPoolBasedSequentialScheduledExecutorService("unit-test", + "thread-1"); + + AtomicBoolean done = new AtomicBoolean(false); + + service.submit(() -> { + Thread.sleep(100); + + done.set(true); + + return null; + }); + + assertTrue(((CompletableFuture) service.submit(() -> done.get())).join()); + } + + @Test + public void testCancelDoesNotStopProcessingInSequentialExecutorService() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + ScheduledExecutorService service = ThreadPoolManager.getPoolBasedSequentialScheduledExecutorService("unit-test", + "thread-2"); + + service.submit(() -> { + Thread.sleep(100); + + return null; + }); + + service.submit(() -> null).cancel(false); + + service.submit(() -> { + latch.countDown(); + }); + + assertTrue(latch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testInstancesDoNotBlockEachOtherInSequentialExecutorService() throws InterruptedException { + ScheduledExecutorService serviceA = ThreadPoolManager + .getPoolBasedSequentialScheduledExecutorService("unit-test", "thread-3"); + ScheduledExecutorService serviceB = ThreadPoolManager + .getPoolBasedSequentialScheduledExecutorService("unit-test", "thread-4"); + + serviceA.submit(() -> { + Thread.sleep(100); + + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + + serviceB.submit(() -> { + latch.countDown(); + }); + + assertTrue(latch.await(800, TimeUnit.MILLISECONDS)); + } + + @Test + public void testSchedulingWorksInSequentialExecutorService() throws InterruptedException { + ScheduledExecutorService service = ThreadPoolManager.getPoolBasedSequentialScheduledExecutorService("unit-test", + "thread-5"); + CountDownLatch latch = new CountDownLatch(1); + + service.schedule(() -> { + latch.countDown(); + }, 200, TimeUnit.MILLISECONDS); + + assertTrue(latch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testSchedulingGetsBlockedByRegularTaskInSequentialExecutorService() throws InterruptedException { + ScheduledExecutorService service = ThreadPoolManager.getPoolBasedSequentialScheduledExecutorService("unit-test", + "thread-6"); + CountDownLatch latch = new CountDownLatch(1); + + service.submit(() -> { + Thread.sleep(200); + + return null; + }); + + service.schedule(() -> { + latch.countDown(); + }, 20, TimeUnit.MILLISECONDS); + + assertFalse(latch.await(100, TimeUnit.MILLISECONDS)); + } + @Test public void testGetScheduledPool() { ThreadPoolExecutor result = ThreadPoolManager.getScheduledPoolUnwrapped("test1"); @@ -136,4 +275,10 @@ public class ThreadPoolManagerTest { assertTrue(cdl.await(5, TimeUnit.SECONDS), "Checking if thread pool " + poolName + " works"); assertFalse(threadPool.isShutdown(), "Checking if thread pool is not shut down"); } + + @AfterAll + public static void disableSequentialScheduledExecutorService() { + ThreadPoolManager manager = new ThreadPoolManager(); + manager.activate(Map.of("unit-test", "0")); + } } diff --git a/tools/static-code-analysis/pmd/suppressions.properties b/tools/static-code-analysis/pmd/suppressions.properties index 6ee882d6a..cba6665f9 100644 --- a/tools/static-code-analysis/pmd/suppressions.properties +++ b/tools/static-code-analysis/pmd/suppressions.properties @@ -15,3 +15,4 @@ org.openhab.core.automation.internal.RuleEngineImpl=AvoidCatchingThrowable org.openhab.core.automation.internal.RuleRegistryImpl=CompareObjectsWithEquals org.openhab.core.automation.internal.provider.AutomationResourceBundlesEventQueue=AvoidCatchingThrowable org.openhab.core.io.console.karaf.internal.InstallServiceCommand=SystemPrintln +org.openhab.core.common.PoolBasedSequentialScheduledExecutorService=CompareObjectsWithEquals