Skip to content
Snippets Groups Projects
Commit 70bbe6da authored by Ryan Izard's avatar Ryan Izard
Browse files

Updated all unit tests with the uncommenting of debugcounter stuff in the...

Updated all unit tests with the uncommenting of debugcounter stuff in the previous commit. For the most part, initialized the MockDebugCounterService and provided it to the MemoryStorageSource to allow counters in the AbstractStorageSourceService.
parent d1dc0436
No related branches found
No related tags found
No related merge requests found
Showing
with 567 additions and 526 deletions
......@@ -679,7 +679,7 @@ public class Device implements IDevice {
if (e.switchDPID.equals(swp.getSwitchDPID())
&& e.switchPort.equals(swp.getPort())) {
if (e.getVlan() == null)
vals.add(VlanVid.ofVlan(-1)); //TODO @Ryan is this the correct way to represent an untagged vlan?
vals.add(VlanVid.ofVlan(-1)); //TODO Update all -1 VLANs (untagged) to the new VlanVid.ZERO
else
vals.add(e.getVlan());
}
......
......@@ -266,10 +266,9 @@ public abstract class ForwardingBase implements IOFMessageListener {
.setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
.setBufferId(OFBufferId.NO_BUFFER)
.setCookie(cookie)
.setOutPort(outPort); // TODO @Ryan why does this need to be set in addition to the action???
.setOutPort(outPort);
try {
//TODO @Ryan counterStore.updatePktOutFMCounterStoreLocal(sw, fm);
if (log.isTraceEnabled()) {
log.trace("Pushing Route flowmod routeIndx={} " +
"sw={} inPort={} outPort={}",
......@@ -281,7 +280,6 @@ public abstract class ForwardingBase implements IOFMessageListener {
messageDamper.write(sw, fmb.build());
if (doFlush) {
sw.flush();
//TODO @Ryan counterStore.updateFlush();
}
// Push the packet out the source switch
......@@ -385,7 +383,6 @@ public abstract class ForwardingBase implements IOFMessageListener {
pob.setInPort((pi.getVersion().compareTo(OFVersion.OF_12) < 0 ? pi.getInPort() : pi.getMatch().get(MatchField.IN_PORT)));
try {
//TODO @Ryan counterStore.updatePktOutFMCounterStoreLocal(sw, po);
messageDamper.write(sw, pob.build());
} catch (IOException e) {
log.error("Failure writing packet out", e);
......@@ -422,7 +419,6 @@ public abstract class ForwardingBase implements IOFMessageListener {
pob.setData(packetData);
try {
//TODO @Ryan counterStore.updatePktOutFMCounterStoreLocal(sw, po);
if (log.isTraceEnabled()) {
log.trace("write broadcast packet on switch-id={} " +
"interfaces={} packet-out={}",
......
......@@ -162,7 +162,7 @@ public class StaticFlowEntries {
if (fm.getInstructions() != null) {
List<OFInstruction> instructions = fm.getInstructions();
for (OFInstruction inst : instructions) {
switch (inst.getType()) { //TODO @Ryan look into replacing with an instanceof construct
switch (inst.getType()) {
case GOTO_TABLE:
entry.put(StaticFlowEntryPusher.COLUMN_INSTR_GOTO_TABLE, InstructionUtils.gotoTableToString(((OFInstructionGotoTable) inst), log));
break;
......@@ -357,7 +357,7 @@ public class StaticFlowEntries {
case StaticFlowEntryPusher.COLUMN_ACTIVE:
entry.put(StaticFlowEntryPusher.COLUMN_ACTIVE, jp.getText());
break;
case StaticFlowEntryPusher.COLUMN_IDLE_TIMEOUT: // TODO @Ryan always store TO's, but conditionally push them (the conditional push hasn't been done yet)
case StaticFlowEntryPusher.COLUMN_IDLE_TIMEOUT:
entry.put(StaticFlowEntryPusher.COLUMN_IDLE_TIMEOUT, jp.getText());
break;
case StaticFlowEntryPusher.COLUMN_HARD_TIMEOUT:
......
......@@ -342,7 +342,6 @@ implements IOFSwitchListener, IFloodlightModule, IStaticFlowEntryPusherService,
}
// get the correct builder for the OF version supported by the switch
// TODO @Ryan this should arguably be a FlowAdd, not a FlowModify, but it really doesn't matter
fmb = OFFactories.getFactory(switchService.getSwitch(DatapathId.of(switchName)).getOFFactory().getVersion()).buildFlowModify();
StaticFlowEntries.initDefaultFlowMod(fmb, entryName);
......@@ -404,7 +403,6 @@ implements IOFSwitchListener, IFloodlightModule, IStaticFlowEntryPusherService,
String match = matchString.toString();
try {
//TODO @Ryan new fromString() method here. Should verify it especially
fmb.setMatch(MatchUtils.fromString(match, fmb.getVersion()));
} catch (IllegalArgumentException e) {
log.debug("ignoring flow entry {} on switch {} with illegal OFMatch() key: " + match, entryName, switchName);
......
/**
* Copyright 2011, Big Switch Networks, Inc.
* Originally created by David Erickson, Stanford University
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
**/
* Copyright 2011, Big Switch Networks, Inc.
* Originally created by David Erickson, Stanford University
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
**/
package net.floodlightcontroller.storage;
......@@ -36,7 +36,9 @@ import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.debugcounter.IDebugCounter;
import net.floodlightcontroller.debugcounter.IDebugCounterService;
import net.floodlightcontroller.debugcounter.IDebugCounterService.MetaData;
import net.floodlightcontroller.restserver.IRestApiService;
import net.floodlightcontroller.storage.web.StorageWebRoutable;
......@@ -46,491 +48,496 @@ import org.slf4j.LoggerFactory;
@LogMessageCategory("System Database")
public abstract class AbstractStorageSource
implements IStorageSourceService, IFloodlightModule {
protected static Logger logger = LoggerFactory.getLogger(AbstractStorageSource.class);
// Shared instance of the executor to use to execute the storage tasks.
// We make this a single threaded executor, because if we used a thread pool
// then storage operations could be executed out of order which would cause
// problems in some cases (e.g. delete and update of a row getting reordered).
// If we wanted to make this more multi-threaded we could have multiple
// worker threads/executors with affinity of operations on a given table
// to a single worker thread. But for now, we'll keep it simple and just have
// a single thread for all operations.
protected static ExecutorService defaultExecutorService = Executors.newSingleThreadExecutor();
protected final static String STORAGE_QUERY_COUNTER_NAME = "StorageQuery";
protected final static String STORAGE_UPDATE_COUNTER_NAME = "StorageUpdate";
protected final static String STORAGE_DELETE_COUNTER_NAME = "StorageDelete";
protected Set<String> allTableNames = new CopyOnWriteArraySet<String>();
protected IDebugCounterService debugCounterService;
protected ExecutorService executorService = defaultExecutorService;
protected IStorageExceptionHandler exceptionHandler;
private Map<String, Set<IStorageSourceListener>> listeners =
new ConcurrentHashMap<String, Set<IStorageSourceListener>>();
// Our dependencies
protected IRestApiService restApi = null;
protected static final String DB_ERROR_EXPLANATION =
"An unknown error occurred while executing asynchronous " +
"database operation";
@LogMessageDoc(level="ERROR",
message="Failure in asynchronous call to executeQuery",
explanation=DB_ERROR_EXPLANATION,
recommendation=LogMessageDoc.GENERIC_ACTION)
abstract class StorageCallable<V> implements Callable<V> {
public V call() {
try {
return doStorageOperation();
}
catch (StorageException e) {
logger.error("Failure in asynchronous call to executeQuery", e);
if (exceptionHandler != null)
exceptionHandler.handleException(e);
throw e;
}
}
abstract protected V doStorageOperation();
}
@LogMessageDoc(level="ERROR",
message="Failure in asynchronous call to updateRows",
explanation=DB_ERROR_EXPLANATION,
recommendation=LogMessageDoc.GENERIC_ACTION)
abstract class StorageRunnable implements Runnable {
public void run() {
try {
doStorageOperation();
}
catch (StorageException e) {
logger.error("Failure in asynchronous call to updateRows", e);
if (exceptionHandler != null)
exceptionHandler.handleException(e);
throw e;
}
}
abstract void doStorageOperation();
}
public AbstractStorageSource() {
this.executorService = defaultExecutorService;
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = (executorService != null) ?
executorService : defaultExecutorService;
}
@Override
public void setExceptionHandler(IStorageExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
@Override
public abstract void setTablePrimaryKeyName(String tableName, String primaryKeyName);
@Override
public void createTable(String tableName, Set<String> indexedColumns) {
allTableNames.add(tableName);
}
@Override
public Set<String> getAllTableNames() {
return allTableNames;
}
public void setDebugCounterService(IDebugCounterService dcs) {
debugCounterService = dcs;
}
protected void updateCounters(String baseName, String tableName) {
/*if (debugCounterService != null) {
String counterName;
if (tableName != null) {
updateCounters(baseName, null);
counterName = baseName + "__" + tableName; //TODO @Ryan __ was CounterStore.Title
} else {
counterName = baseName;
}
TODO @Ryan not sure what to do about this counter. It seems different than debug counters.
* IDebugCounter counter = debugCounterService.getCounter(counterName);
if (counter == null) {
counter = counterStore.createCounter(counterName, CounterType.LONG);
}
counter.increment();
}*/
}
@Override
public abstract IQuery createQuery(String tableName, String[] columnNames,
IPredicate predicate, RowOrdering ordering);
@Override
public IResultSet executeQuery(IQuery query) {
updateCounters(STORAGE_QUERY_COUNTER_NAME, query.getTableName());
return executeQueryImpl(query);
}
protected abstract IResultSet executeQueryImpl(IQuery query);
@Override
public IResultSet executeQuery(String tableName, String[] columnNames,
IPredicate predicate, RowOrdering ordering) {
IQuery query = createQuery(tableName, columnNames, predicate, ordering);
IResultSet resultSet = executeQuery(query);
return resultSet;
}
@Override
public Object[] executeQuery(String tableName, String[] columnNames,
IPredicate predicate, RowOrdering ordering, IRowMapper rowMapper) {
List<Object> objectList = new ArrayList<Object>();
IResultSet resultSet = executeQuery(tableName, columnNames, predicate, ordering);
while (resultSet.next()) {
Object object = rowMapper.mapRow(resultSet);
objectList.add(object);
}
return objectList.toArray();
}
@Override
public Future<IResultSet> executeQueryAsync(final IQuery query) {
Future<IResultSet> future = executorService.submit(
new StorageCallable<IResultSet>() {
public IResultSet doStorageOperation() {
return executeQuery(query);
}
});
return future;
}
@Override
public Future<IResultSet> executeQueryAsync(final String tableName,
final String[] columnNames, final IPredicate predicate,
final RowOrdering ordering) {
Future<IResultSet> future = executorService.submit(
new StorageCallable<IResultSet>() {
public IResultSet doStorageOperation() {
return executeQuery(tableName, columnNames,
predicate, ordering);
}
});
return future;
}
@Override
public Future<Object[]> executeQueryAsync(final String tableName,
final String[] columnNames, final IPredicate predicate,
final RowOrdering ordering, final IRowMapper rowMapper) {
Future<Object[]> future = executorService.submit(
new StorageCallable<Object[]>() {
public Object[] doStorageOperation() {
return executeQuery(tableName, columnNames, predicate,
ordering, rowMapper);
}
});
return future;
}
@Override
public Future<?> insertRowAsync(final String tableName,
final Map<String,Object> values) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
insertRow(tableName, values);
}
}, null);
return future;
}
@Override
public Future<?> updateRowsAsync(final String tableName, final List<Map<String,Object>> rows) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
updateRows(tableName, rows);
}
}, null);
return future;
}
@Override
public Future<?> updateMatchingRowsAsync(final String tableName,
final IPredicate predicate, final Map<String,Object> values) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
updateMatchingRows(tableName, predicate, values);
}
}, null);
return future;
}
@Override
public Future<?> updateRowAsync(final String tableName,
final Object rowKey, final Map<String,Object> values) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
updateRow(tableName, rowKey, values);
}
}, null);
return future;
}
@Override
public Future<?> updateRowAsync(final String tableName,
final Map<String,Object> values) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
updateRow(tableName, values);
}
}, null);
return future;
}
@Override
public Future<?> deleteRowAsync(final String tableName, final Object rowKey) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
deleteRow(tableName, rowKey);
}
}, null);
return future;
}
@Override
public Future<?> deleteRowsAsync(final String tableName, final Set<Object> rowKeys) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
deleteRows(tableName, rowKeys);
}
}, null);
return future;
}
@Override
public Future<?> deleteMatchingRowsAsync(final String tableName, final IPredicate predicate) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
deleteMatchingRows(tableName, predicate);
}
}, null);
return future;
}
@Override
public Future<?> getRowAsync(final String tableName, final Object rowKey) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
getRow(tableName, rowKey);
}
}, null);
return future;
}
@Override
public Future<?> saveAsync(final IResultSet resultSet) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
resultSet.save();
}
}, null);
return future;
}
@Override
public void insertRow(String tableName, Map<String, Object> values) {
updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
insertRowImpl(tableName, values);
}
protected abstract void insertRowImpl(String tableName, Map<String, Object> values);
@Override
public void updateRows(String tableName, List<Map<String,Object>> rows) {
updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
updateRowsImpl(tableName, rows);
}
protected abstract void updateRowsImpl(String tableName, List<Map<String,Object>> rows);
@Override
public void updateMatchingRows(String tableName, IPredicate predicate,
Map<String, Object> values) {
updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
updateMatchingRowsImpl(tableName, predicate, values);
}
protected abstract void updateMatchingRowsImpl(String tableName, IPredicate predicate,
Map<String, Object> values);
@Override
public void updateRow(String tableName, Object rowKey,
Map<String, Object> values) {
updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
updateRowImpl(tableName, rowKey, values);
}
protected abstract void updateRowImpl(String tableName, Object rowKey,
Map<String, Object> values);
@Override
public void updateRow(String tableName, Map<String, Object> values) {
updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
updateRowImpl(tableName, values);
}
protected abstract void updateRowImpl(String tableName, Map<String, Object> values);
@Override
public void deleteRow(String tableName, Object rowKey) {
updateCounters(STORAGE_DELETE_COUNTER_NAME, tableName);
deleteRowImpl(tableName, rowKey);
}
protected abstract void deleteRowImpl(String tableName, Object rowKey);
@Override
public void deleteRows(String tableName, Set<Object> rowKeys) {
updateCounters(STORAGE_DELETE_COUNTER_NAME, tableName);
deleteRowsImpl(tableName, rowKeys);
}
protected abstract void deleteRowsImpl(String tableName, Set<Object> rowKeys);
@Override
public void deleteMatchingRows(String tableName, IPredicate predicate) {
IResultSet resultSet = null;
try {
resultSet = executeQuery(tableName, null, predicate, null);
while (resultSet.next()) {
resultSet.deleteRow();
}
resultSet.save();
}
finally {
if (resultSet != null)
resultSet.close();
}
}
@Override
public IResultSet getRow(String tableName, Object rowKey) {
updateCounters(STORAGE_QUERY_COUNTER_NAME, tableName);
return getRowImpl(tableName, rowKey);
}
protected abstract IResultSet getRowImpl(String tableName, Object rowKey);
@Override
public synchronized void addListener(String tableName, IStorageSourceListener listener) {
Set<IStorageSourceListener> tableListeners = listeners.get(tableName);
if (tableListeners == null) {
tableListeners = new CopyOnWriteArraySet<IStorageSourceListener>();
listeners.put(tableName, tableListeners);
}
tableListeners.add(listener);
}
@Override
public synchronized void removeListener(String tableName, IStorageSourceListener listener) {
Set<IStorageSourceListener> tableListeners = listeners.get(tableName);
if (tableListeners != null) {
tableListeners.remove(listener);
}
}
@LogMessageDoc(level="ERROR",
message="Exception caught handling storage notification",
explanation="An unknown error occured while trying to notify" +
" storage listeners",
recommendation=LogMessageDoc.GENERIC_ACTION)
protected synchronized void notifyListeners(StorageSourceNotification notification) {
if (logger.isTraceEnabled()) {
logger.trace("Notifying storage listeneres: {}", notification);
}
String tableName = notification.getTableName();
Set<Object> keys = notification.getKeys();
Set<IStorageSourceListener> tableListeners = listeners.get(tableName);
if (tableListeners != null) {
for (IStorageSourceListener listener : tableListeners) {
try {
switch (notification.getAction()) {
case MODIFY:
listener.rowsModified(tableName, keys);
break;
case DELETE:
listener.rowsDeleted(tableName, keys);
break;
}
}
catch (Exception e) {
logger.error("Exception caught handling storage notification", e);
}
}
}
}
@Override
public void notifyListeners(List<StorageSourceNotification> notifications) {
for (StorageSourceNotification notification : notifications)
notifyListeners(notification);
}
// IFloodlightModule
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IStorageSourceService.class);
return l;
}
@Override
public Map<Class<? extends IFloodlightService>,
IFloodlightService> getServiceImpls() {
Map<Class<? extends IFloodlightService>,
IFloodlightService> m =
new HashMap<Class<? extends IFloodlightService>,
IFloodlightService>();
m.put(IStorageSourceService.class, this);
return m;
}
@Override
public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IRestApiService.class);
l.add(IDebugCounterService.class);
return l;
}
@Override
public void init(FloodlightModuleContext context)
throws FloodlightModuleException {
restApi =
context.getServiceImpl(IRestApiService.class);
debugCounterService =
context.getServiceImpl(IDebugCounterService.class);
}
@Override
public void startUp(FloodlightModuleContext context) {
restApi.addRestletRoutable(new StorageWebRoutable());
}
implements IStorageSourceService, IFloodlightModule {
protected static Logger logger = LoggerFactory.getLogger(AbstractStorageSource.class);
// Shared instance of the executor to use to execute the storage tasks.
// We make this a single threaded executor, because if we used a thread pool
// then storage operations could be executed out of order which would cause
// problems in some cases (e.g. delete and update of a row getting reordered).
// If we wanted to make this more multi-threaded we could have multiple
// worker threads/executors with affinity of operations on a given table
// to a single worker thread. But for now, we'll keep it simple and just have
// a single thread for all operations.
protected static ExecutorService defaultExecutorService = Executors.newSingleThreadExecutor();
protected final static String STORAGE_QUERY_COUNTER_NAME = "StorageQuery";
protected final static String STORAGE_UPDATE_COUNTER_NAME = "StorageUpdate";
protected final static String STORAGE_DELETE_COUNTER_NAME = "StorageDelete";
protected Set<String> allTableNames = new CopyOnWriteArraySet<String>();
protected ExecutorService executorService = defaultExecutorService;
protected IStorageExceptionHandler exceptionHandler;
protected IDebugCounterService debugCounterService;
private Map<String, IDebugCounter> debugCounters = new HashMap<String, IDebugCounter>();
private Map<String, Set<IStorageSourceListener>> listeners =
new ConcurrentHashMap<String, Set<IStorageSourceListener>>();
// Our dependencies
protected IRestApiService restApi = null;
protected static final String DB_ERROR_EXPLANATION =
"An unknown error occurred while executing asynchronous " +
"database operation";
@LogMessageDoc(level="ERROR",
message="Failure in asynchronous call to executeQuery",
explanation=DB_ERROR_EXPLANATION,
recommendation=LogMessageDoc.GENERIC_ACTION)
abstract class StorageCallable<V> implements Callable<V> {
public V call() {
try {
return doStorageOperation();
}
catch (StorageException e) {
logger.error("Failure in asynchronous call to executeQuery", e);
if (exceptionHandler != null)
exceptionHandler.handleException(e);
throw e;
}
}
abstract protected V doStorageOperation();
}
@LogMessageDoc(level="ERROR",
message="Failure in asynchronous call to updateRows",
explanation=DB_ERROR_EXPLANATION,
recommendation=LogMessageDoc.GENERIC_ACTION)
abstract class StorageRunnable implements Runnable {
public void run() {
try {
doStorageOperation();
}
catch (StorageException e) {
logger.error("Failure in asynchronous call to updateRows", e);
if (exceptionHandler != null)
exceptionHandler.handleException(e);
throw e;
}
}
abstract void doStorageOperation();
}
public AbstractStorageSource() {
this.executorService = defaultExecutorService;
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = (executorService != null) ?
executorService : defaultExecutorService;
}
@Override
public void setExceptionHandler(IStorageExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
@Override
public abstract void setTablePrimaryKeyName(String tableName, String primaryKeyName);
@Override
public void createTable(String tableName, Set<String> indexedColumns) {
allTableNames.add(tableName);
}
@Override
public Set<String> getAllTableNames() {
return allTableNames;
}
public void setDebugCounterService(IDebugCounterService dcs) {
debugCounterService = dcs;
}
protected void updateCounters(String tableOpType, String tableName) {
String counterName = tableName + "__" + tableOpType;
IDebugCounter counter = debugCounters.get(counterName);
if (counter == null) {
counter = debugCounterService.registerCounter(this.getClass().getCanonicalName(), counterName, counterName, MetaData.WARN);
debugCounters.put(counterName, counter); // maintain a list of the counters as the tables register with the storage source service
}
counter.increment();
/*
* Now, do the counter for the base only (general update, add, or delete operation)
*/
counter = debugCounters.get(tableOpType);
if (counter == null) {
counter = debugCounterService.registerCounter(this.getClass().getCanonicalName(), tableOpType, tableOpType, MetaData.WARN);
debugCounters.put(tableOpType, counter);
}
counter.increment();
}
@Override
public abstract IQuery createQuery(String tableName, String[] columnNames,
IPredicate predicate, RowOrdering ordering);
@Override
public IResultSet executeQuery(IQuery query) {
updateCounters(STORAGE_QUERY_COUNTER_NAME, query.getTableName());
return executeQueryImpl(query);
}
protected abstract IResultSet executeQueryImpl(IQuery query);
@Override
public IResultSet executeQuery(String tableName, String[] columnNames,
IPredicate predicate, RowOrdering ordering) {
IQuery query = createQuery(tableName, columnNames, predicate, ordering);
IResultSet resultSet = executeQuery(query);
return resultSet;
}
@Override
public Object[] executeQuery(String tableName, String[] columnNames,
IPredicate predicate, RowOrdering ordering, IRowMapper rowMapper) {
List<Object> objectList = new ArrayList<Object>();
IResultSet resultSet = executeQuery(tableName, columnNames, predicate, ordering);
while (resultSet.next()) {
Object object = rowMapper.mapRow(resultSet);
objectList.add(object);
}
return objectList.toArray();
}
@Override
public Future<IResultSet> executeQueryAsync(final IQuery query) {
Future<IResultSet> future = executorService.submit(
new StorageCallable<IResultSet>() {
public IResultSet doStorageOperation() {
return executeQuery(query);
}
});
return future;
}
@Override
public Future<IResultSet> executeQueryAsync(final String tableName,
final String[] columnNames, final IPredicate predicate,
final RowOrdering ordering) {
Future<IResultSet> future = executorService.submit(
new StorageCallable<IResultSet>() {
public IResultSet doStorageOperation() {
return executeQuery(tableName, columnNames,
predicate, ordering);
}
});
return future;
}
@Override
public Future<Object[]> executeQueryAsync(final String tableName,
final String[] columnNames, final IPredicate predicate,
final RowOrdering ordering, final IRowMapper rowMapper) {
Future<Object[]> future = executorService.submit(
new StorageCallable<Object[]>() {
public Object[] doStorageOperation() {
return executeQuery(tableName, columnNames, predicate,
ordering, rowMapper);
}
});
return future;
}
@Override
public Future<?> insertRowAsync(final String tableName,
final Map<String,Object> values) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
insertRow(tableName, values);
}
}, null);
return future;
}
@Override
public Future<?> updateRowsAsync(final String tableName, final List<Map<String,Object>> rows) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
updateRows(tableName, rows);
}
}, null);
return future;
}
@Override
public Future<?> updateMatchingRowsAsync(final String tableName,
final IPredicate predicate, final Map<String,Object> values) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
updateMatchingRows(tableName, predicate, values);
}
}, null);
return future;
}
@Override
public Future<?> updateRowAsync(final String tableName,
final Object rowKey, final Map<String,Object> values) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
updateRow(tableName, rowKey, values);
}
}, null);
return future;
}
@Override
public Future<?> updateRowAsync(final String tableName,
final Map<String,Object> values) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
updateRow(tableName, values);
}
}, null);
return future;
}
@Override
public Future<?> deleteRowAsync(final String tableName, final Object rowKey) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
deleteRow(tableName, rowKey);
}
}, null);
return future;
}
@Override
public Future<?> deleteRowsAsync(final String tableName, final Set<Object> rowKeys) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
deleteRows(tableName, rowKeys);
}
}, null);
return future;
}
@Override
public Future<?> deleteMatchingRowsAsync(final String tableName, final IPredicate predicate) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
deleteMatchingRows(tableName, predicate);
}
}, null);
return future;
}
@Override
public Future<?> getRowAsync(final String tableName, final Object rowKey) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
getRow(tableName, rowKey);
}
}, null);
return future;
}
@Override
public Future<?> saveAsync(final IResultSet resultSet) {
Future<?> future = executorService.submit(
new StorageRunnable() {
public void doStorageOperation() {
resultSet.save();
}
}, null);
return future;
}
@Override
public void insertRow(String tableName, Map<String, Object> values) {
updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
insertRowImpl(tableName, values);
}
protected abstract void insertRowImpl(String tableName, Map<String, Object> values);
@Override
public void updateRows(String tableName, List<Map<String,Object>> rows) {
updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
updateRowsImpl(tableName, rows);
}
protected abstract void updateRowsImpl(String tableName, List<Map<String,Object>> rows);
@Override
public void updateMatchingRows(String tableName, IPredicate predicate,
Map<String, Object> values) {
updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
updateMatchingRowsImpl(tableName, predicate, values);
}
protected abstract void updateMatchingRowsImpl(String tableName, IPredicate predicate,
Map<String, Object> values);
@Override
public void updateRow(String tableName, Object rowKey,
Map<String, Object> values) {
updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
updateRowImpl(tableName, rowKey, values);
}
protected abstract void updateRowImpl(String tableName, Object rowKey,
Map<String, Object> values);
@Override
public void updateRow(String tableName, Map<String, Object> values) {
updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
updateRowImpl(tableName, values);
}
protected abstract void updateRowImpl(String tableName, Map<String, Object> values);
@Override
public void deleteRow(String tableName, Object rowKey) {
updateCounters(STORAGE_DELETE_COUNTER_NAME, tableName);
deleteRowImpl(tableName, rowKey);
}
protected abstract void deleteRowImpl(String tableName, Object rowKey);
@Override
public void deleteRows(String tableName, Set<Object> rowKeys) {
updateCounters(STORAGE_DELETE_COUNTER_NAME, tableName);
deleteRowsImpl(tableName, rowKeys);
}
protected abstract void deleteRowsImpl(String tableName, Set<Object> rowKeys);
@Override
public void deleteMatchingRows(String tableName, IPredicate predicate) {
IResultSet resultSet = null;
try {
resultSet = executeQuery(tableName, null, predicate, null);
while (resultSet.next()) {
resultSet.deleteRow();
}
resultSet.save();
}
finally {
if (resultSet != null)
resultSet.close();
}
}
@Override
public IResultSet getRow(String tableName, Object rowKey) {
updateCounters(STORAGE_QUERY_COUNTER_NAME, tableName);
return getRowImpl(tableName, rowKey);
}
protected abstract IResultSet getRowImpl(String tableName, Object rowKey);
@Override
public synchronized void addListener(String tableName, IStorageSourceListener listener) {
Set<IStorageSourceListener> tableListeners = listeners.get(tableName);
if (tableListeners == null) {
tableListeners = new CopyOnWriteArraySet<IStorageSourceListener>();
listeners.put(tableName, tableListeners);
}
tableListeners.add(listener);
}
@Override
public synchronized void removeListener(String tableName, IStorageSourceListener listener) {
Set<IStorageSourceListener> tableListeners = listeners.get(tableName);
if (tableListeners != null) {
tableListeners.remove(listener);
}
}
@LogMessageDoc(level="ERROR",
message="Exception caught handling storage notification",
explanation="An unknown error occured while trying to notify" +
" storage listeners",
recommendation=LogMessageDoc.GENERIC_ACTION)
protected synchronized void notifyListeners(StorageSourceNotification notification) {
if (logger.isTraceEnabled()) {
logger.trace("Notifying storage listeneres: {}", notification);
}
String tableName = notification.getTableName();
Set<Object> keys = notification.getKeys();
Set<IStorageSourceListener> tableListeners = listeners.get(tableName);
if (tableListeners != null) {
for (IStorageSourceListener listener : tableListeners) {
try {
switch (notification.getAction()) {
case MODIFY:
listener.rowsModified(tableName, keys);
break;
case DELETE:
listener.rowsDeleted(tableName, keys);
break;
}
}
catch (Exception e) {
logger.error("Exception caught handling storage notification", e);
}
}
}
}
@Override
public void notifyListeners(List<StorageSourceNotification> notifications) {
for (StorageSourceNotification notification : notifications)
notifyListeners(notification);
}
// IFloodlightModule
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IStorageSourceService.class);
return l;
}
@Override
public Map<Class<? extends IFloodlightService>,
IFloodlightService> getServiceImpls() {
Map<Class<? extends IFloodlightService>,
IFloodlightService> m =
new HashMap<Class<? extends IFloodlightService>,
IFloodlightService>();
m.put(IStorageSourceService.class, this);
return m;
}
@Override
public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IRestApiService.class);
l.add(IDebugCounterService.class);
return l;
}
@Override
public void init(FloodlightModuleContext context)
throws FloodlightModuleException {
restApi =
context.getServiceImpl(IRestApiService.class);
debugCounterService =
context.getServiceImpl(IDebugCounterService.class);
}
@Override
public void startUp(FloodlightModuleContext context) {
restApi.addRestletRoutable(new StorageWebRoutable());
debugCounterService.registerModule(this.getClass().getCanonicalName());
}
}
......@@ -187,6 +187,11 @@ public class MemoryStorageSource extends NoSqlStorageSource {
super.startUp(context);
executorService = new SynchronousExecutorService();
}
@Override
public void init(FloodlightModuleContext context) throws net.floodlightcontroller.core.module.FloodlightModuleException {
super.init(context);
};
@Override
public Map<Class<? extends IFloodlightService>,
......
......@@ -55,8 +55,8 @@ import net.floodlightcontroller.core.OFSwitch;
import net.floodlightcontroller.core.PortChangeType;
import net.floodlightcontroller.core.SwitchDescription;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.debugcounter.DebugCounterServiceImpl;
import net.floodlightcontroller.debugcounter.IDebugCounterService;
import net.floodlightcontroller.debugcounter.MockDebugCounterService;
import net.floodlightcontroller.debugevent.DebugEventService;
import net.floodlightcontroller.debugevent.IDebugEventService;
import net.floodlightcontroller.restserver.IRestApiService;
......@@ -118,7 +118,7 @@ public class OFSwitchManagerTest{
// TODO: should mock IDebugCounterService and make sure
// the expected counters are updated.
DebugCounterServiceImpl debugCounterService = new DebugCounterServiceImpl();
MockDebugCounterService debugCounterService = new MockDebugCounterService();
fmc.addService(IDebugCounterService.class, debugCounterService);
DebugEventService debugEventService = new DebugEventService();
......@@ -141,6 +141,7 @@ public class OFSwitchManagerTest{
syncService.init(fmc);
switchManager.init(fmc);
debugCounterService.init(fmc);
memstorage.init(fmc);
debugEventService.init(fmc);
restApi.init(fmc);
cm.init(fmc);
......@@ -148,6 +149,7 @@ public class OFSwitchManagerTest{
syncService.init(fmc);
switchManager.startUpBase(fmc);
debugCounterService.startUp(fmc);
memstorage.startUp(fmc);
debugEventService.startUp(fmc);
threadPool.startUp(fmc);
restApi.startUp(fmc);
......
......@@ -31,6 +31,8 @@ import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.internal.IOFSwitchService;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.debugcounter.IDebugCounterService;
import net.floodlightcontroller.debugcounter.MockDebugCounterService;
import net.floodlightcontroller.packet.ARP;
import net.floodlightcontroller.packet.Data;
import net.floodlightcontroller.packet.Ethernet;
......@@ -78,6 +80,7 @@ public class FirewallTest extends FloodlightTestCase {
protected IPacket tcpPacketReply;
protected IPacket broadcastMalformedPacket;
private Firewall firewall;
private MockDebugCounterService debugCounterService;
public static String TestSwitch1DPID = "00:00:00:00:00:00:00:01";
@Override
......@@ -87,8 +90,9 @@ public class FirewallTest extends FloodlightTestCase {
cntx = new FloodlightContext();
mockFloodlightProvider = getMockFloodlightProvider();
mockSwitchManager = getMockSwitchService();
debugCounterService = new MockDebugCounterService();
firewall = new Firewall();
IStorageSourceService storageService = new MemoryStorageSource();
MemoryStorageSource storageService = new MemoryStorageSource();
RestApiServer restApi = new RestApiServer();
// Mock switches
......@@ -103,16 +107,19 @@ public class FirewallTest extends FloodlightTestCase {
mockSwitchManager.setSwitches(switches);
FloodlightModuleContext fmc = new FloodlightModuleContext();
fmc.addService(IFloodlightProviderService.class,
mockFloodlightProvider);
fmc.addService(IFloodlightProviderService.class, mockFloodlightProvider);
fmc.addService(IDebugCounterService.class, debugCounterService);
fmc.addService(IOFSwitchService.class, mockSwitchManager);
fmc.addService(IFirewallService.class, firewall);
fmc.addService(IStorageSourceService.class, storageService);
fmc.addService(IRestApiService.class, restApi);
debugCounterService.init(fmc);
storageService.init(fmc);
restApi.init(fmc);
firewall.init(fmc);
debugCounterService.startUp(fmc);
storageService.startUp(fmc);
firewall.startUp(fmc);
// Build our test packet
......
......@@ -32,6 +32,8 @@ import net.floodlightcontroller.core.IOFMessageListener;
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.internal.IOFSwitchService;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.debugcounter.IDebugCounterService;
import net.floodlightcontroller.debugcounter.MockDebugCounterService;
import net.floodlightcontroller.packet.Data;
import net.floodlightcontroller.packet.Ethernet;
import net.floodlightcontroller.packet.IPacket;
......@@ -88,6 +90,7 @@ public class LearningSwitchTest extends FloodlightTestCase {
private OFFactory factory = OFFactories.getFactory(OFVersion.OF_13);
private FloodlightModuleContext fmc;
private RestApiServer restApiService;
private MockDebugCounterService debugCounterService;
@Override
@Before
......@@ -162,16 +165,20 @@ public class LearningSwitchTest extends FloodlightTestCase {
.setReason(OFPacketInReason.NO_MATCH)
.build();
this.debugCounterService = new MockDebugCounterService();
this.learningSwitch = new LearningSwitch();
this.restApiService = new RestApiServer();
this.fmc = new FloodlightModuleContext();
fmc.addService(IOFSwitchService.class, getMockSwitchService());
fmc.addService(IFloodlightProviderService.class, getMockFloodlightProvider());
fmc.addService(IDebugCounterService.class, debugCounterService);
fmc.addService(IRestApiService.class, this.restApiService);
this.debugCounterService.init(fmc);
this.restApiService.init(fmc);
this.learningSwitch.init(fmc);
this.debugCounterService.startUp(fmc);
this.restApiService.startUp(fmc);
this.learningSwitch.startUp(fmc);
......
......@@ -49,7 +49,6 @@ import net.floodlightcontroller.debugevent.MockDebugEventService;
import net.floodlightcontroller.linkdiscovery.ILinkDiscoveryListener;
import net.floodlightcontroller.linkdiscovery.ILinkDiscoveryService;
import net.floodlightcontroller.linkdiscovery.LinkInfo;
import net.floodlightcontroller.notification.NotificationManagerFactory;
import net.floodlightcontroller.packet.Data;
import net.floodlightcontroller.packet.Ethernet;
import net.floodlightcontroller.packet.IPacket;
......@@ -139,12 +138,13 @@ public class LinkDiscoveryManagerTest extends FloodlightTestCase {
IDebugEventService debugEventService = new MockDebugEventService();
MockThreadPoolService tp = new MockThreadPoolService();
RestApiServer restApi = new RestApiServer();
MemoryStorageSource storageService = new MemoryStorageSource();
cntx.addService(IRestApiService.class, restApi);
cntx.addService(IThreadPoolService.class, tp);
cntx.addService(IRoutingService.class, routingEngine);
cntx.addService(ILinkDiscoveryService.class, ldm);
cntx.addService(ITopologyService.class, ldm);
cntx.addService(IStorageSourceService.class, new MemoryStorageSource());
cntx.addService(IStorageSourceService.class, storageService);
cntx.addService(IFloodlightProviderService.class, getMockFloodlightProvider());
cntx.addService(IDebugCounterService.class, debugCounterService);
cntx.addService(IDebugEventService.class, debugEventService);
......@@ -152,6 +152,8 @@ public class LinkDiscoveryManagerTest extends FloodlightTestCase {
restApi.init(cntx);
tp.init(cntx);
routingEngine.init(cntx);
storageService.init(cntx);
storageService.startUp(cntx);
ldm.init(cntx);
restApi.startUp(cntx);
tp.startUp(cntx);
......
......@@ -44,6 +44,8 @@ import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.internal.IOFSwitchService;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.test.MockFloodlightProvider;
import net.floodlightcontroller.debugcounter.IDebugCounterService;
import net.floodlightcontroller.debugcounter.MockDebugCounterService;
import net.floodlightcontroller.test.FloodlightTestCase;
import net.floodlightcontroller.util.FlowModUtils;
import net.floodlightcontroller.util.MatchUtils;
......@@ -126,10 +128,11 @@ public class StaticFlowTests extends FloodlightTestCase {
private StaticFlowEntryPusher staticFlowEntryPusher;
private IOFSwitchService switchService;
private IOFSwitch mockSwitch;
private MockDebugCounterService debugCounterService;
private Capture<OFMessage> writeCapture;
private Capture<List<OFMessage>> writeCaptureList;
private long dpid;
private IStorageSourceService storage;
private MemoryStorageSource storage;
static {
FlowMod3 = factory.buildFlowModify().build();
TestRule3 = new HashMap<String,Object>();
......@@ -185,9 +188,10 @@ public class StaticFlowTests extends FloodlightTestCase {
@Override
public void setUp() throws Exception {
super.setUp();
debugCounterService = new MockDebugCounterService();
staticFlowEntryPusher = new StaticFlowEntryPusher();
switchService = getMockSwitchService();
storage = createStorageWithFlowEntries();
storage = new MemoryStorageSource();
dpid = HexString.toLong(TestSwitch1DPID);
mockSwitch = createNiceMock(IOFSwitch.class);
......@@ -206,6 +210,7 @@ public class StaticFlowTests extends FloodlightTestCase {
FloodlightModuleContext fmc = new FloodlightModuleContext();
fmc.addService(IStorageSourceService.class, storage);
fmc.addService(IOFSwitchService.class, getMockSwitchService());
fmc.addService(IDebugCounterService.class, debugCounterService);
MockFloodlightProvider mockFloodlightProvider = getMockFloodlightProvider();
Map<DatapathId, IOFSwitch> switchMap = new HashMap<DatapathId, IOFSwitch>();
......@@ -217,7 +222,14 @@ public class StaticFlowTests extends FloodlightTestCase {
fmc.addService(IOFSwitchService.class, switchService);
restApi.init(fmc);
debugCounterService.init(fmc);
storage.init(fmc);
staticFlowEntryPusher.init(fmc);
debugCounterService.init(fmc);
storage.startUp(fmc);
createStorageWithFlowEntries();
staticFlowEntryPusher.startUp(fmc); // again, to hack unittest
}
......@@ -328,10 +340,10 @@ public class StaticFlowTests extends FloodlightTestCase {
IStorageSourceService createStorageWithFlowEntries() {
return populateStorageWithFlowEntries(new MemoryStorageSource());
return populateStorageWithFlowEntries();
}
IStorageSourceService populateStorageWithFlowEntries(IStorageSourceService storage) {
IStorageSourceService populateStorageWithFlowEntries() {
Set<String> indexedColumns = new HashSet<String>();
indexedColumns.add(COLUMN_NAME);
storage.createTable(StaticFlowEntryPusher.TABLE_NAME, indexedColumns);
......
......@@ -18,10 +18,13 @@
package net.floodlightcontroller.storage.memory.tests;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.debugcounter.IDebugCounterService;
import net.floodlightcontroller.debugcounter.MockDebugCounterService;
import net.floodlightcontroller.restserver.IRestApiService;
import net.floodlightcontroller.restserver.RestApiServer;
import net.floodlightcontroller.storage.memory.MemoryStorageSource;
import net.floodlightcontroller.storage.tests.StorageTest;
import org.junit.Before;
public class MemoryStorageTest extends StorageTest {
......@@ -32,6 +35,7 @@ public class MemoryStorageTest extends StorageTest {
restApi = new RestApiServer();
FloodlightModuleContext fmc = new FloodlightModuleContext();
fmc.addService(IRestApiService.class, restApi);
fmc.addService(IDebugCounterService.class, new MockDebugCounterService());
restApi.init(fmc);
storageSource.init(fmc);
restApi.startUp(fmc);
......
......@@ -23,17 +23,17 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.*;
import org.junit.Test;
import net.floodlightcontroller.debugcounter.MockDebugCounterService;
import net.floodlightcontroller.restserver.RestApiServer;
import net.floodlightcontroller.storage.CompoundPredicate;
import net.floodlightcontroller.storage.IStorageExceptionHandler;
......@@ -148,6 +148,7 @@ public abstract class StorageTest extends FloodlightTestCase {
indexedColumnNames.add(PERSON_FIRST_NAME);
indexedColumnNames.add(PERSON_LAST_NAME);
storageSource.setExceptionHandler(null);
storageSource.setDebugCounterService(new MockDebugCounterService());
storageSource.createTable(PERSON_TABLE_NAME, indexedColumnNames);
storageSource.setTablePrimaryKeyName(PERSON_TABLE_NAME, PERSON_SSN);
initPersons();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment