From 75eafe84e2157341097a55e559d912657f2d916a Mon Sep 17 00:00:00 2001 From: Daniel Nishimura Date: Thu, 1 Aug 2019 15:33:47 -0700 Subject: [PATCH] SAMZA-2286: Integrate MetadataStore#putAll for improved startup time. --- .../grouper/task/TaskAssignmentManager.java | 42 ++++++++++++++ .../task/TaskPartitionAssignmentManager.java | 55 +++++++++++------- .../samza/storage/ChangelogStreamManager.java | 7 ++- .../org/apache/samza/zk/ZkJobCoordinator.java | 17 ++++-- .../samza/coordinator/JobModelManager.scala | 23 ++++---- .../task/TestTaskAssignmentManager.java | 57 +++++++++++++++---- .../TestTaskPartitionAssignmentManager.java | 16 +++--- .../coordinator/TestJobModelManager.java | 46 +++++++++------ .../storage/TestChangelogStreamManager.java | 51 ++++++++++++++++- .../apache/samza/zk/TestZkJobCoordinator.java | 24 +++++++- 10 files changed, 264 insertions(+), 74 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java index 16f8a5190e..b286964195 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java @@ -26,6 +26,7 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde; import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping; import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping; +import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskMode; import org.apache.samza.metadatastore.MetadataStore; import org.apache.samza.serializers.Serde; @@ -101,7 +102,9 @@ public Map readTaskModes() { * @param taskName the task name * @param containerId the SamzaContainer ID or {@code null} to delete the mapping * @param taskMode the mode of the task + * @deprecated in favor of {@link #writeTaskContainerAssignments(Map)} */ + @Deprecated public void writeTaskContainerMapping(String taskName, String containerId, TaskMode taskMode) { String existingContainerId = taskNameToContainerId.get(taskName); if (existingContainerId != null && !existingContainerId.equals(containerId)) { @@ -121,6 +124,45 @@ public void writeTaskContainerMapping(String taskName, String containerId, TaskM } } + /** + * Method to write task-to-container mapping and the mode of the task to {@link MetadataStore} + * @param taskToContainerMapping the task name to container mapping. Container IDs that are null are deleted. + */ + public void writeTaskContainerAssignments(Map taskToContainerMapping) { + HashMap containerMapping = new HashMap<>(); + HashMap modeMapping = new HashMap<>(); + + for (String taskName : taskToContainerMapping.keySet()) { + ContainerModel container = taskToContainerMapping.get(taskName); + String containerId = container.getId(); + TaskMode taskMode = container.getTasks().get(new TaskName(taskName)).getTaskMode(); + + String existingContainerId = taskNameToContainerId.get(taskName); + if (existingContainerId != null && !existingContainerId.equals(containerId)) { + LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId}); + } else { + LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId); + } + + if (containerId == null) { + LOG.info("Deleting task: {} from the task-to-container assignment in the metadata store", taskName); + taskContainerMappingMetadataStore.delete(taskName); + taskModeMappingMetadataStore.delete(taskName); + taskNameToContainerId.remove(taskName); + } else { + LOG.info("Assigning task: {} to container ID: {} in the metadata store", taskName, containerId); + containerMapping.put(taskName, containerIdSerde.toBytes(containerId)); + modeMapping.put(taskName, taskModeSerde.toBytes(taskMode.toString())); + taskNameToContainerId.put(taskName, containerId); + } + } + + if (!containerMapping.isEmpty()) { + taskContainerMappingMetadataStore.putAll(containerMapping); + taskModeMappingMetadataStore.putAll(modeMapping); + } + } + /** * Deletes the task container info from the {@link MetadataStore} for the task names. * diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java index 7e32f0a08e..83490f6217 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java @@ -19,6 +19,12 @@ package org.apache.samza.container.grouper.task; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.collections4.CollectionUtils; import org.apache.samza.SamzaException; import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde; import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping; @@ -30,10 +36,6 @@ import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; /** * Used to persist and read the task-to-partition assignment information @@ -69,25 +71,40 @@ public TaskPartitionAssignmentManager(MetadataStore metadataStore) { * Stores the task to partition assignments to the metadata store. * @param partition the system stream partition. * @param taskNames the task names to which the partition is assigned to. + * @deprecated in favor of {@link #writeTaskPartitionAssignments(Map)} */ + @Deprecated public void writeTaskPartitionAssignment(SystemStreamPartition partition, List taskNames) { - // For broadcast streams, a input system stream partition will be mapped to more than one tasks in a - // SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here - // systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka. - String serializedSSPAsJson = serializeSSPToJson(partition); - if (taskNames == null || taskNames.isEmpty()) { - LOG.info("Deleting the key: {} from the metadata store.", partition); - metadataStore.delete(serializedSSPAsJson); - } else { - try { - String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames); - byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString); - LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames); - metadataStore.put(serializedSSPAsJson, taskNamesAsBytes); - } catch (Exception e) { - throw new SamzaException("Exception occurred when writing task to partition assignment.", e); + writeTaskPartitionAssignments(ImmutableMap.of(partition, taskNames)); + } + + /** + * Stores the task names to {@link SystemStreamPartition} assignments to the metadata store. + * @param sspToTaskNameMapping the mapped assignments to write to the metadata store. If the task name list is empty, + * then the entry is deleted from the metadata store. + */ + public void writeTaskPartitionAssignments(Map> sspToTaskNameMapping) { + HashMap tasksMapping = new HashMap<>(); + for (SystemStreamPartition ssp: sspToTaskNameMapping.keySet()) { + // For broadcast streams, a input system stream partition will be mapped to more than one tasks in a + // SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here + // systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka. + List taskNames = sspToTaskNameMapping.get(ssp); + String serializedSSPAsJson = serializeSSPToJson(ssp); + if (CollectionUtils.isEmpty(taskNames)) { + LOG.info("Deleting the task assignment for partition: {} from the metadata store.", ssp); + metadataStore.delete(serializedSSPAsJson); + } else { + try { + String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames); + LOG.info("Assigning the partition: {} with taskNames: {} in the metadata store.", serializedSSPAsJson, taskNamesAsString); + tasksMapping.put(serializedSSPAsJson, valueSerde.toBytes(taskNamesAsString)); + } catch (IOException e) { + throw new SamzaException("Exception occurred when writing task to partition assignment.", e); + } } } + metadataStore.putAll(tasksMapping); } /** diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java index e86e21a60f..14c222ab95 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java @@ -90,18 +90,23 @@ public Map readPartitionMapping() { */ public void writePartitionMapping(Map changelogEntries) { LOG.debug("Updating changelog information with: "); + HashMap changelogEntriesToStore = new HashMap<>(); for (Map.Entry entry : changelogEntries.entrySet()) { Preconditions.checkNotNull(entry.getKey()); String taskName = entry.getKey().getTaskName(); if (entry.getValue() != null) { String changeLogPartitionId = String.valueOf(entry.getValue()); LOG.debug("TaskName: {} to Partition: {}", taskName, entry.getValue()); - metadataStore.put(taskName, valueSerde.toBytes(changeLogPartitionId)); + changelogEntriesToStore.put(taskName, valueSerde.toBytes(changeLogPartitionId)); } else { LOG.debug("Deleting the TaskName: {}", taskName); metadataStore.delete(taskName); } } + if (!changelogEntries.isEmpty()) { + LOG.info("Storing {} changelog partition assignments", changelogEntries.size()); + metadataStore.putAll(changelogEntriesToStore); + } } /** diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index dfd03044e8..d5c5c3cc72 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -29,6 +30,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.I0Itec.zkclient.IZkStateListener; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; @@ -316,15 +318,22 @@ void loadMetadataResources(JobModel jobModel) { metadataResourceUtil.createResources(); if (coordinatorStreamStore != null) { - // TODO: SAMZA-2273 - publish configs async CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE); NamespaceAwareCoordinatorStreamStore configStore = new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE); - for (Map.Entry entry : config.entrySet()) { - byte[] serializedValue = jsonSerde.toBytes(entry.getValue()); - configStore.put(entry.getKey(), serializedValue); + + // Delete configs from metadata store that are no longer needed. + Set configsToDelete = Sets.difference(configStore.all().keySet(), config.keySet()); + for (String configKeyToDelete : configsToDelete) { + LOG.debug("Deleting config: {}", configKeyToDelete); + configStore.delete(configKeyToDelete); } + // Publish current configs to metadata store. + Map configsToStore = + config.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> jsonSerde.toBytes(e.getValue()))); + configStore.putAll(configsToStore); + // fan out the startpoints StartpointManager startpointManager = createStartpointManager(); startpointManager.start(); diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 2afc36bd86..dbff10e133 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -239,23 +239,24 @@ object JobModelManager extends Logging { // taskName to SystemStreamPartitions is done here to wire-in the data to {@see JobModel}. val sspToTaskNameMap: util.Map[SystemStreamPartition, util.List[String]] = new util.HashMap[SystemStreamPartition, util.List[String]]() + // The task to container mapping that will be written to the coordinator stream by the TaskAssignmentManager. + val taskToContainerModelMap: util.Map[String, ContainerModel] = new util.HashMap[String, ContainerModel]() + for (container <- jobModel.getContainers.values()) { for ((taskName, taskModel) <- container.getTasks) { - info ("Storing task: %s and container ID: %s into metadata store" format(taskName.getTaskName, container.getId)) - taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, container.getTasks.get(taskName).getTaskMode) - for (partition <- taskModel.getSystemStreamPartitions) { - if (!sspToTaskNameMap.containsKey(partition)) { - sspToTaskNameMap.put(partition, new util.ArrayList[String]()) - } - sspToTaskNameMap.get(partition).add(taskName.getTaskName) + taskToContainerModelMap.put(taskName.getTaskName, container) + for (ssp <- taskModel.getSystemStreamPartitions) { + sspToTaskNameMap.putIfAbsent(ssp, new util.ArrayList[String]()) + sspToTaskNameMap.get(ssp).add(taskName.getTaskName) } } } - for ((ssp, taskNames) <- sspToTaskNameMap) { - info ("Storing ssp: %s and task: %s into metadata store" format(ssp, taskNames)) - taskPartitionAssignmentManager.writeTaskPartitionAssignment(ssp, taskNames) - } + taskAssignmentManager.writeTaskContainerAssignments(taskToContainerModelMap) + info("Stored %d task-to-container assignments in the metadata store." format taskToContainerModelMap.size()) + + taskPartitionAssignmentManager.writeTaskPartitionAssignments(sspToTaskNameMap) + info("Stored %d partition-to-tasks assignments in the metadata store." format sspToTaskNameMap.size()) } /** diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java index 357e8ae4fb..afe9c21dec 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java @@ -19,21 +19,28 @@ package org.apache.samza.container.grouper.task; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil; import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory; import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping; import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping; +import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.TaskMode; +import org.apache.samza.job.model.TaskModel; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -59,29 +66,59 @@ public void tearDown() { @Test public void testTaskAssignmentManager() { - Map expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1", "Task2", "2", "Task3", "0", "Task4", "1"); + String t0 = "Task0"; + String t1 = "Task1"; + String t2 = "Task2"; + String t3 = "Task3"; + String t4 = "Task4"; - for (Map.Entry entry : expectedMap.entrySet()) { - taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active); - } + ContainerModel c0 = createMockContainerModel("0", ImmutableList.of(t0, t3)); + ContainerModel c1 = createMockContainerModel("1", ImmutableList.of(t1, t4)); + ContainerModel c2 = createMockContainerModel("2", ImmutableList.of(t2)); + + ImmutableMap expectedMap = ImmutableMap.of(t0, c0, t1, c1, t2, c2, t3, c0, t4, c1); + + taskAssignmentManager.writeTaskContainerAssignments(expectedMap); Map localMap = taskAssignmentManager.readTaskAssignment(); - assertEquals(expectedMap, localMap); + assertEquals(expectedMap.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getId())), localMap); taskAssignmentManager.close(); } + public ContainerModel createMockContainerModel(String containerId, List taskNames) { + ContainerModel mockContainerModel = Mockito.mock(ContainerModel.class); + HashMap taskModelMap = new HashMap<>(); + + for (String taskName : taskNames) { + TaskModel mockTaskModel = Mockito.mock(TaskModel.class); + TaskName task = new TaskName(taskName); + Mockito.doReturn(task).when(mockTaskModel).getTaskName(); + Mockito.doReturn(TaskMode.Active).when(mockTaskModel).getTaskMode(); + taskModelMap.put(task, mockTaskModel); + } + + Mockito.doReturn(containerId).when(mockContainerModel).getId(); + Mockito.doReturn(taskModelMap).when(mockContainerModel).getTasks(); + return mockContainerModel; + } + @Test public void testDeleteMappings() { - Map expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1"); + //Map expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1"); + String t0 = "Task0"; + String t1 = "Task1"; - for (Map.Entry entry : expectedMap.entrySet()) { - taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active); - } + ContainerModel c0 = createMockContainerModel("0", ImmutableList.of(t0)); + ContainerModel c1 = createMockContainerModel("1", ImmutableList.of(t1)); + + ImmutableMap expectedMap = ImmutableMap.of(t0, c0, t1, c1); + + taskAssignmentManager.writeTaskContainerAssignments(expectedMap); Map localMap = taskAssignmentManager.readTaskAssignment(); - assertEquals(expectedMap, localMap); + assertEquals(expectedMap.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getId())), localMap); taskAssignmentManager.deleteTaskContainerMappings(localMap.keySet()); diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java index 1a7f0c9c84..f664353983 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java @@ -63,7 +63,7 @@ public void tearDown() { @Test public void testReadAfterWrite() { List testTaskNames = ImmutableList.of("test-task1", "test-task2", "test-task3"); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames); + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames)); Map> expectedMapping = ImmutableMap.of(testSystemStreamPartition, testTaskNames); Map> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments(); @@ -74,7 +74,7 @@ public void testReadAfterWrite() { @Test public void testDeleteAfterWrite() { List testTaskNames = ImmutableList.of("test-task1", "test-task2", "test-task3"); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames); + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames)); Map> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments(); Assert.assertEquals(1, actualMapping.size()); @@ -94,9 +94,9 @@ public void testReadPartitionAssignments() { SystemStreamPartition testSystemStreamPartition3 = new SystemStreamPartition(TEST_SYSTEM, "stream-3", PARTITION); List testTaskNames3 = ImmutableList.of("test-task6", "test-task7", "test-task8"); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition1, testTaskNames1); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition2, testTaskNames2); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition3, testTaskNames3); + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition1, testTaskNames1)); + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition2, testTaskNames2)); + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition3, testTaskNames3)); Map> expectedMapping = ImmutableMap.of(testSystemStreamPartition1, testTaskNames1, testSystemStreamPartition2, testTaskNames2, testSystemStreamPartition3, testTaskNames3); @@ -109,13 +109,13 @@ public void testReadPartitionAssignments() { public void testMultipleUpdatesReturnsTheMostRecentValue() { List testTaskNames1 = ImmutableList.of("test-task1", "test-task2", "test-task3"); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames1); + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames1)); List testTaskNames2 = ImmutableList.of("test-task4", "test-task5"); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames2); + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames2)); List testTaskNames3 = ImmutableList.of("test-task6", "test-task7", "test-task8"); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames3); + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames3)); Map> expectedMapping = ImmutableMap.of(testSystemStreamPartition, testTaskNames3); Map> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments(); diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java index d58cf18d64..c8d0dcd548 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java @@ -21,10 +21,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; - -import java.util.*; - import com.google.common.collect.ImmutableSet; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.samza.Partition; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; @@ -52,12 +55,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.*; - import org.junit.runner.RunWith; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; @@ -66,6 +63,13 @@ import org.powermock.modules.junit4.PowerMockRunner; import scala.collection.JavaConversions; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Unit tests for {@link JobModelManager} */ @@ -270,7 +274,7 @@ public void testUpdateTaskAssignments() { when(mockJobModel.getContainers()).thenReturn(containerMapping); when(mockGrouperMetadata.getPreviousTaskToProcessorAssignment()).thenReturn(new HashMap<>()); - Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMapping(Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerAssignments(Mockito.any()); JobModelManager.updateTaskAssignments(mockJobModel, mockTaskAssignmentManager, mockTaskPartitionAssignmentManager, mockGrouperMetadata); @@ -289,18 +293,24 @@ public void testUpdateTaskAssignments() { // Verifications Mockito.verify(mockJobModel, atLeast(1)).getContainers(); Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any()); - Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-1", "test-container-id", TaskMode.Active); - Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-2", "test-container-id", TaskMode.Active); - Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-3", "test-container-id", TaskMode.Active); - Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-4", "test-container-id", TaskMode.Active); + ImmutableMap expectedTaskToContainerModelMapping = + ImmutableMap.of( + "task-1", containerModel, + "task-2", containerModel, + "task-3", containerModel, + "task-4", containerModel); + Mockito.verify(mockTaskAssignmentManager).writeTaskContainerAssignments(expectedTaskToContainerModelMapping); // Verify that the old, stale partition mappings had been purged in the coordinator stream. Mockito.verify(mockTaskPartitionAssignmentManager).delete(systemStreamPartitions); // Verify that the new task to partition assignment is stored in the coordinator stream. - Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition1, ImmutableList.of("task-1")); - Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition2, ImmutableList.of("task-2")); - Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition3, ImmutableList.of("task-3")); - Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition4, ImmutableList.of("task-4")); + ImmutableMap> expectedPartitionToTasksMapping = + ImmutableMap.of( + testSystemStreamPartition1, ImmutableList.of("task-1"), + testSystemStreamPartition2, ImmutableList.of("task-2"), + testSystemStreamPartition3, ImmutableList.of("task-3"), + testSystemStreamPartition4, ImmutableList.of("task-4")); + Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignments(expectedPartitionToTasksMapping); } } diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestChangelogStreamManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestChangelogStreamManager.java index 1cb88c2672..83a22f5ea3 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/TestChangelogStreamManager.java +++ b/samza-core/src/test/java/org/apache/samza/storage/TestChangelogStreamManager.java @@ -18,13 +18,19 @@ */ package org.apache.samza.storage; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; +import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde; +import org.apache.samza.coordinator.stream.messages.SetChangelogMapping; +import org.apache.samza.metadatastore.InMemoryMetadataStore; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.StreamValidationException; @@ -35,9 +41,12 @@ import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; import org.junit.Test; +import org.mockito.ArgumentCaptor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; public class TestChangelogStreamManager { @@ -62,6 +71,44 @@ public void createChangelogStreams() { ChangelogStreamManager.createChangelogStreams(config, MAX_CHANGELOG_STREAM_PARTITIONS); } + @Test + public void testWritePartitionMapping() { + InMemoryMetadataStore inMemoryMetadataStore = new InMemoryMetadataStore(); + NamespaceAwareCoordinatorStreamStore prevChangelogStreamStore = + new NamespaceAwareCoordinatorStreamStore(inMemoryMetadataStore, SetChangelogMapping.TYPE); + CoordinatorStreamValueSerde valueSerde = new CoordinatorStreamValueSerde(SetChangelogMapping.TYPE); + TaskName t0 = new TaskName("Task0"); + TaskName t1 = new TaskName("Task1"); + TaskName t2 = new TaskName("Task2"); + TaskName t3 = new TaskName("Task3"); + TaskName t4 = new TaskName("Task4"); + + prevChangelogStreamStore.put(t0.getTaskName(), valueSerde.toBytes("0")); + prevChangelogStreamStore.put(t1.getTaskName(), valueSerde.toBytes("1")); + prevChangelogStreamStore.put(t2.getTaskName(), valueSerde.toBytes("2")); + prevChangelogStreamStore.put(t3.getTaskName(), valueSerde.toBytes("3")); + prevChangelogStreamStore.put(t4.getTaskName(), valueSerde.toBytes("4")); + + NamespaceAwareCoordinatorStreamStore currentChangelogStreamStore = + spy(new NamespaceAwareCoordinatorStreamStore(inMemoryMetadataStore, SetChangelogMapping.TYPE)); + ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(currentChangelogStreamStore); + + HashMap newPartitionMapping = new HashMap<>(); + newPartitionMapping.put(t0, 5); + newPartitionMapping.put(t1, 6); + newPartitionMapping.put(t2, null); // delete + newPartitionMapping.put(t3, null); // delete + newPartitionMapping.put(t4, 7); + + changelogStreamManager.writePartitionMapping(newPartitionMapping); + + verify(currentChangelogStreamStore).delete(t2.getTaskName()); + verify(currentChangelogStreamStore).delete(t3.getTaskName()); + ArgumentCaptor argumentsCaptured = ArgumentCaptor.forClass(Map.class); + verify(currentChangelogStreamStore).putAll((Map) argumentsCaptured.capture()); + assertEquals(ImmutableSet.of(t0.getTaskName(), t1.getTaskName(), t4.getTaskName()), argumentsCaptured.getValue().keySet()); + } + public static class MockSystemAdminFactory implements SystemFactory { @Override public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java index d9d5379937..74d3ddd7aa 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.I0Itec.zkclient.ZkClient; import org.apache.samza.Partition; import org.apache.samza.SamzaException; @@ -35,6 +36,8 @@ import org.apache.samza.coordinator.MetadataResourceUtil; import org.apache.samza.coordinator.StreamPartitionCountMonitor; import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; +import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde; +import org.apache.samza.coordinator.stream.messages.SetConfig; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; @@ -47,10 +50,12 @@ import org.apache.zookeeper.Watcher; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.any; @@ -59,6 +64,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; @@ -265,7 +271,15 @@ public void testLoadMetadataResources() throws IOException { when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(jobModel); StartpointManager mockStartpointManager = Mockito.mock(StartpointManager.class); - + CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE); + HashMap currentCoordinatorConfigMap = + new HashMap<>(config.entrySet().stream().collect( + Collectors.toMap(e -> CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE, e.getKey()), e -> jsonSerde.toBytes(e.getValue())))); + HashMap prevCoordinatorConfigMap = new HashMap<>(currentCoordinatorConfigMap); + prevCoordinatorConfigMap.put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE, "deleteMe1"), "foo1".getBytes()); + prevCoordinatorConfigMap.put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE, "deleteMe2"), "foo2".getBytes()); + + doReturn(prevCoordinatorConfigMap).when(coordinatorStreamStore).all(); ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", config, new NoOpMetricsRegistry(), zkUtils, zkMetadataStore, coordinatorStreamStore)); doReturn(mockStartpointManager).when(zkJobCoordinator).createStartpointManager(); @@ -277,6 +291,14 @@ public void testLoadMetadataResources() throws IOException { zkJobCoordinator.loadMetadataResources(jobModel); + // Verify old keys are deleted and current keys are stored. + verify(coordinatorStreamStore, times(2)).delete(anyString()); + verify(coordinatorStreamStore).delete(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE, "deleteMe1")); + verify(coordinatorStreamStore).delete(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE, "deleteMe2")); + ArgumentCaptor argumentsCaptured = ArgumentCaptor.forClass(Map.class); + verify(coordinatorStreamStore).putAll((Map) argumentsCaptured.capture()); + assertEquals(currentCoordinatorConfigMap.keySet(), argumentsCaptured.getValue().keySet()); + verify(mockMetadataResourceUtil).createResources(); verify(mockStartpointManager).start(); verify(mockStartpointManager).fanOut(any());