Skip to content
Snippets Groups Projects
Commit 8339011c authored by meiyangbigswitch's avatar meiyangbigswitch
Browse files

new flow reconciliation design --new files

parent 2986358e
No related branches found
No related tags found
No related merge requests found
/**
* Copyright 2013, Big Switch Networks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
**/
package net.floodlightcontroller.flowcache;
import net.floodlightcontroller.core.FloodlightContextStore;
import net.floodlightcontroller.flowcache.PriorityPendingQueue.EventPriority;
import net.floodlightcontroller.core.module.IFloodlightService;
/**
* The Interface IFlowReconcileEngine.
*
* public interface APIs to Big Switch Flow-Reconcile Service. FlowReconcileEngine queries
* the network-level flows that are currently deployed in the underlying
* network. The flow reconcile engine can be triggered using various filters by using the
* corresponding APIs.
*
* @author MeiYang
*/
public interface IFlowReconcileEngineService extends IFloodlightService {
/* public static final String FLOWCACHE_APP_NAME =
"net.floodlightcontroller.flowcache.appName";
public static final String FLOWCACHE_APP_INSTANCE_NAME =
"net.floodlightcontroller.flowcache.appInstanceName";
*/
/**
* The flow reconcile engine trigger event type indicating the event that triggered the
* query. The callerOpaqueObj can be keyed based on this event type
*/
public static enum FCQueryEvType {
/** The GET query. Flows need not be reconciled for this query type */
GET,
/** A new App was added. */
BVS_ADDED,
/** An App was deleted. */
BVS_DELETED,
/** Interface rule of an app was modified */
BVS_INTERFACE_RULE_CHANGED_MATCH_SWITCH_PORT,
/** Some App configuration was changed */
BVS_PRIORITY_CHANGED,
/** ACL configuration was changed */
ACL_CONFIG_CHANGED,
/** VRS routing rule was changed */
VRS_ROUTING_RULE_CHANGED,
/** device had moved to a different port in the network */
DEVICE_MOVED,
/** device's property had changed, such as tag assignment */
DEVICE_PROPERTY_CHANGED,
/** Link down */
LINK_DOWN,
/** second round query caused by rewrite flags set */
REWRITE_QUERY,
}
/**
* A FloodlightContextStore object that can be used to interact with the
* FloodlightContext information about flowCache.
*/
public static final FloodlightContextStore<String> fcStore =
new FloodlightContextStore<String>();
/**
* Submit a network flow query with query parameters specified in FCQueryObj
* object. The query object can be created using one of the newFCQueryObj
* helper functions in IFlowCache interface.
*
* The queried flows are returned via the flowQueryRespHandler() callback
* that the caller must implement. The caller can match the query with
* the response using unique callerOpaqueData which remains unchanged
* in the request and response callback.
*
* @see com.bigswitch.floodlight.flowcache#flowQueryRespHandler
* @param query the flow cache query object as input
*/
public void submitFlowQueryEvent(FCQueryObj query, EventPriority priority);
/**
* Flush Local Counter Updates
*
*/
public void updateFlush();
public void querySwitchFlowTable(long swDpid);
}
package net.floodlightcontroller.flowcache;
import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* PriorityPendingQueue class - This class is a variant implementation for PriorityBlockingQueue
* PriorityBlockingQueue implementation has two problems:
* 1. service for events with the same priority has no guarantee of FIFO sequence. This can be solved by override of comparator though.
* 2. PriorityBlockingQueue is implemented through heap, which has a O(log(n)) complexity for enqueue and dequeue operations.
* to get a O(1) complexity with enqueue and dequeue operations, we propose this PriorityPendingList class.
* <p>
* PriorityPendingQueue has three separate queues: High Priority, Medium Priority and Low Priority.
* the requirements here are:
* 1. dequeue from the Queue will always return the event with the highest priority
* 2. events with the same priority will be dequeued in their inserting order
* 3. enqueue and dequeue have O(1) complexity
*
* current only support offer() and take() methods
*
* @author meiyang
*
*/
public class PriorityPendingQueue<E> extends AbstractQueue<E> {
private LinkedBlockingQueue<E> highPriorityQueue;
private LinkedBlockingQueue<E> mediumPriorityQueue;
private LinkedBlockingQueue<E> lowPriorityQueue;
private final AtomicInteger count = new AtomicInteger(0);
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private int capacity;
public enum EventPriority {
EVENT_HIGH,
EVENT_MEDIUM,
EVENT_LOW,
}
public PriorityPendingQueue() {
highPriorityQueue= new LinkedBlockingQueue<E>();
mediumPriorityQueue= new LinkedBlockingQueue<E>();
lowPriorityQueue= new LinkedBlockingQueue<E>();
capacity= Integer.MAX_VALUE;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
try {
while (count.get() == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = extract();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E peek() {
//todo
return null;
}
public boolean offer(E e, EventPriority p) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
insert(e,p);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public boolean offer(E e) {
return false;
}
private E extract() {
E first = highPriorityQueue.poll();
if (first==null)
first = mediumPriorityQueue.poll();
if (first==null)
first = lowPriorityQueue.poll();
return first;
}
private void insert(E e, EventPriority p) {
if (p==EventPriority.EVENT_HIGH)
highPriorityQueue.offer(e);
if (p==EventPriority.EVENT_MEDIUM)
mediumPriorityQueue.offer(e);
if (p==EventPriority.EVENT_LOW)
lowPriorityQueue.offer(e);
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
@Override
public Iterator<E> iterator() {
// TODO Auto-generated method stub
return null;
}
@Override
public int size() {
return count.get();
}
@Override
public void clear() {
highPriorityQueue.clear();
mediumPriorityQueue.clear();
lowPriorityQueue.clear();
count.set(0);
}
@Override
public boolean isEmpty() {
return count.get() == 0;
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment