Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.serializers.Serde;
Expand Down Expand Up @@ -101,7 +102,9 @@ public Map<TaskName, TaskMode> readTaskModes() {
* @param taskName the task name
* @param containerId the SamzaContainer ID or {@code null} to delete the mapping
* @param taskMode the mode of the task
* @deprecated in favor of {@link #writeTaskContainerAssignments(Map)}
*/
@Deprecated
public void writeTaskContainerMapping(String taskName, String containerId, TaskMode taskMode) {
String existingContainerId = taskNameToContainerId.get(taskName);
if (existingContainerId != null && !existingContainerId.equals(containerId)) {
Expand All @@ -121,6 +124,45 @@ public void writeTaskContainerMapping(String taskName, String containerId, TaskM
}
}

/**
* Method to write task-to-container mapping and the mode of the task to {@link MetadataStore}
* @param taskToContainerMapping the task name to container mapping. Container IDs that are null are deleted.
*/
public void writeTaskContainerAssignments(Map<String, ContainerModel> taskToContainerMapping) {
HashMap<String, byte[]> containerMapping = new HashMap<>();
HashMap<String, byte[]> modeMapping = new HashMap<>();

for (String taskName : taskToContainerMapping.keySet()) {
ContainerModel container = taskToContainerMapping.get(taskName);
String containerId = container.getId();
TaskMode taskMode = container.getTasks().get(new TaskName(taskName)).getTaskMode();

String existingContainerId = taskNameToContainerId.get(taskName);
if (existingContainerId != null && !existingContainerId.equals(containerId)) {
LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId});
} else {
LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId);
}

if (containerId == null) {
LOG.info("Deleting task: {} from the task-to-container assignment in the metadata store", taskName);
taskContainerMappingMetadataStore.delete(taskName);
taskModeMappingMetadataStore.delete(taskName);
taskNameToContainerId.remove(taskName);
} else {
LOG.info("Assigning task: {} to container ID: {} in the metadata store", taskName, containerId);
containerMapping.put(taskName, containerIdSerde.toBytes(containerId));
modeMapping.put(taskName, taskModeSerde.toBytes(taskMode.toString()));
taskNameToContainerId.put(taskName, containerId);
}
}

if (!containerMapping.isEmpty()) {
taskContainerMappingMetadataStore.putAll(containerMapping);
taskModeMappingMetadataStore.putAll(modeMapping);
}
}

/**
* Deletes the task container info from the {@link MetadataStore} for the task names.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
package org.apache.samza.container.grouper.task;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
Expand All @@ -30,10 +36,6 @@
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Used to persist and read the task-to-partition assignment information
Expand Down Expand Up @@ -69,25 +71,40 @@ public TaskPartitionAssignmentManager(MetadataStore metadataStore) {
* Stores the task to partition assignments to the metadata store.
* @param partition the system stream partition.
* @param taskNames the task names to which the partition is assigned to.
* @deprecated in favor of {@link #writeTaskPartitionAssignments(Map)}
*/
@Deprecated
public void writeTaskPartitionAssignment(SystemStreamPartition partition, List<String> taskNames) {
// For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
// SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
// systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
String serializedSSPAsJson = serializeSSPToJson(partition);
if (taskNames == null || taskNames.isEmpty()) {
LOG.info("Deleting the key: {} from the metadata store.", partition);
metadataStore.delete(serializedSSPAsJson);
} else {
try {
String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames);
byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString);
LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames);
metadataStore.put(serializedSSPAsJson, taskNamesAsBytes);
} catch (Exception e) {
throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
writeTaskPartitionAssignments(ImmutableMap.of(partition, taskNames));
}

/**
* Stores the task names to {@link SystemStreamPartition} assignments to the metadata store.
* @param sspToTaskNameMapping the mapped assignments to write to the metadata store. If the task name list is empty,
* then the entry is deleted from the metadata store.
*/
public void writeTaskPartitionAssignments(Map<SystemStreamPartition, List<String>> sspToTaskNameMapping) {
HashMap<String, byte[]> tasksMapping = new HashMap<>();
for (SystemStreamPartition ssp: sspToTaskNameMapping.keySet()) {
// For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
// SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
// systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
List<String> taskNames = sspToTaskNameMapping.get(ssp);
String serializedSSPAsJson = serializeSSPToJson(ssp);
if (CollectionUtils.isEmpty(taskNames)) {
LOG.info("Deleting the task assignment for partition: {} from the metadata store.", ssp);
metadataStore.delete(serializedSSPAsJson);
} else {
try {
String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames);
LOG.info("Assigning the partition: {} with taskNames: {} in the metadata store.", serializedSSPAsJson, taskNamesAsString);
tasksMapping.put(serializedSSPAsJson, valueSerde.toBytes(taskNamesAsString));
} catch (IOException e) {
throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
}
}
}
metadataStore.putAll(tasksMapping);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,23 @@ public Map<TaskName, Integer> readPartitionMapping() {
*/
public void writePartitionMapping(Map<TaskName, Integer> changelogEntries) {
LOG.debug("Updating changelog information with: ");
HashMap<String, byte[]> changelogEntriesToStore = new HashMap<>();
for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
String taskName = entry.getKey().getTaskName();
if (entry.getValue() != null) {
String changeLogPartitionId = String.valueOf(entry.getValue());
LOG.debug("TaskName: {} to Partition: {}", taskName, entry.getValue());
metadataStore.put(taskName, valueSerde.toBytes(changeLogPartitionId));
changelogEntriesToStore.put(taskName, valueSerde.toBytes(changeLogPartitionId));
} else {
LOG.debug("Deleting the TaskName: {}", taskName);
metadataStore.delete(taskName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this change. But it would be beneficial to add a deleteAll API in MetadataStore (synonymous to putAll API). deleteAll API would simplify the iterative delete API invocations. If you agree, then it would be great to create a follow-up ticket and add a todo here.

}
}
if (!changelogEntries.isEmpty()) {
LOG.info("Storing {} changelog partition assignments", changelogEntries.size());
metadataStore.putAll(changelogEntriesToStore);
}
}

/**
Expand Down
17 changes: 13 additions & 4 deletions samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -29,6 +30,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.I0Itec.zkclient.IZkStateListener;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
Expand Down Expand Up @@ -316,15 +318,22 @@ void loadMetadataResources(JobModel jobModel) {
metadataResourceUtil.createResources();

if (coordinatorStreamStore != null) {
// TODO: SAMZA-2273 - publish configs async
CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
NamespaceAwareCoordinatorStreamStore configStore =
new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE);
for (Map.Entry<String, String> entry : config.entrySet()) {
byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
configStore.put(entry.getKey(), serializedValue);

// Delete configs from metadata store that are no longer needed.
Set<String> configsToDelete = Sets.difference(configStore.all().keySet(), config.keySet());
for (String configKeyToDelete : configsToDelete) {
LOG.debug("Deleting config: {}", configKeyToDelete);
configStore.delete(configKeyToDelete);
}

// Publish current configs to metadata store.
Map<String, byte[]> configsToStore =
config.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> jsonSerde.toBytes(e.getValue())));
configStore.putAll(configsToStore);

// fan out the startpoints
StartpointManager startpointManager = createStartpointManager();
startpointManager.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,23 +239,24 @@ object JobModelManager extends Logging {
// taskName to SystemStreamPartitions is done here to wire-in the data to {@see JobModel}.
val sspToTaskNameMap: util.Map[SystemStreamPartition, util.List[String]] = new util.HashMap[SystemStreamPartition, util.List[String]]()

// The task to container mapping that will be written to the coordinator stream by the TaskAssignmentManager.
val taskToContainerModelMap: util.Map[String, ContainerModel] = new util.HashMap[String, ContainerModel]()

for (container <- jobModel.getContainers.values()) {
for ((taskName, taskModel) <- container.getTasks) {
info ("Storing task: %s and container ID: %s into metadata store" format(taskName.getTaskName, container.getId))
taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, container.getTasks.get(taskName).getTaskMode)
for (partition <- taskModel.getSystemStreamPartitions) {
if (!sspToTaskNameMap.containsKey(partition)) {
sspToTaskNameMap.put(partition, new util.ArrayList[String]())
}
sspToTaskNameMap.get(partition).add(taskName.getTaskName)
taskToContainerModelMap.put(taskName.getTaskName, container)
for (ssp <- taskModel.getSystemStreamPartitions) {
sspToTaskNameMap.putIfAbsent(ssp, new util.ArrayList[String]())
sspToTaskNameMap.get(ssp).add(taskName.getTaskName)
}
}
}

for ((ssp, taskNames) <- sspToTaskNameMap) {
info ("Storing ssp: %s and task: %s into metadata store" format(ssp, taskNames))
taskPartitionAssignmentManager.writeTaskPartitionAssignment(ssp, taskNames)
}
taskAssignmentManager.writeTaskContainerAssignments(taskToContainerModelMap)
info("Stored %d task-to-container assignments in the metadata store." format taskToContainerModelMap.size())

taskPartitionAssignmentManager.writeTaskPartitionAssignments(sspToTaskNameMap)
info("Stored %d partition-to-tasks assignments in the metadata store." format sspToTaskNameMap.size())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,28 @@

package org.apache.samza.container.grouper.task;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand All @@ -59,29 +66,59 @@ public void tearDown() {

@Test
public void testTaskAssignmentManager() {
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1", "Task2", "2", "Task3", "0", "Task4", "1");
String t0 = "Task0";
String t1 = "Task1";
String t2 = "Task2";
String t3 = "Task3";
String t4 = "Task4";

for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
}
ContainerModel c0 = createMockContainerModel("0", ImmutableList.of(t0, t3));
ContainerModel c1 = createMockContainerModel("1", ImmutableList.of(t1, t4));
ContainerModel c2 = createMockContainerModel("2", ImmutableList.of(t2));

ImmutableMap<String, ContainerModel> expectedMap = ImmutableMap.of(t0, c0, t1, c1, t2, c2, t3, c0, t4, c1);

taskAssignmentManager.writeTaskContainerAssignments(expectedMap);

Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();

assertEquals(expectedMap, localMap);
assertEquals(expectedMap.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getId())), localMap);

taskAssignmentManager.close();
}

public ContainerModel createMockContainerModel(String containerId, List<String> taskNames) {
ContainerModel mockContainerModel = Mockito.mock(ContainerModel.class);
HashMap<TaskName, TaskModel> taskModelMap = new HashMap<>();

for (String taskName : taskNames) {
TaskModel mockTaskModel = Mockito.mock(TaskModel.class);
TaskName task = new TaskName(taskName);
Mockito.doReturn(task).when(mockTaskModel).getTaskName();
Mockito.doReturn(TaskMode.Active).when(mockTaskModel).getTaskMode();
taskModelMap.put(task, mockTaskModel);
}

Mockito.doReturn(containerId).when(mockContainerModel).getId();
Mockito.doReturn(taskModelMap).when(mockContainerModel).getTasks();
return mockContainerModel;
}

@Test
public void testDeleteMappings() {
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1");
//Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1");
String t0 = "Task0";
String t1 = "Task1";

for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
}
ContainerModel c0 = createMockContainerModel("0", ImmutableList.of(t0));
ContainerModel c1 = createMockContainerModel("1", ImmutableList.of(t1));

ImmutableMap<String, ContainerModel> expectedMap = ImmutableMap.of(t0, c0, t1, c1);

taskAssignmentManager.writeTaskContainerAssignments(expectedMap);

Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
assertEquals(expectedMap, localMap);
assertEquals(expectedMap.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getId())), localMap);

taskAssignmentManager.deleteTaskContainerMappings(localMap.keySet());

Expand Down
Loading