diff --git a/src/main/java/net/floodlightcontroller/flowcache/IFlowReconcileEngineService.java b/src/main/java/net/floodlightcontroller/flowcache/IFlowReconcileEngineService.java new file mode 100644 index 0000000000000000000000000000000000000000..b69951a4cb1781ac87de6df0566c26618d92ad16 --- /dev/null +++ b/src/main/java/net/floodlightcontroller/flowcache/IFlowReconcileEngineService.java @@ -0,0 +1,94 @@ +/** + * Copyright 2013, Big Switch Networks, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + **/ + +package net.floodlightcontroller.flowcache; +import net.floodlightcontroller.core.FloodlightContextStore; +import net.floodlightcontroller.flowcache.PriorityPendingQueue.EventPriority; +import net.floodlightcontroller.core.module.IFloodlightService; + +/** + * The Interface IFlowReconcileEngine. + * + * public interface APIs to Big Switch Flow-Reconcile Service. FlowReconcileEngine queries + * the network-level flows that are currently deployed in the underlying + * network. The flow reconcile engine can be triggered using various filters by using the + * corresponding APIs. + * + * @author MeiYang + */ +public interface IFlowReconcileEngineService extends IFloodlightService { + +/* public static final String FLOWCACHE_APP_NAME = + "net.floodlightcontroller.flowcache.appName"; + public static final String FLOWCACHE_APP_INSTANCE_NAME = + "net.floodlightcontroller.flowcache.appInstanceName"; +*/ + /** + * The flow reconcile engine trigger event type indicating the event that triggered the + * query. The callerOpaqueObj can be keyed based on this event type + */ + public static enum FCQueryEvType { + /** The GET query. Flows need not be reconciled for this query type */ + GET, + /** A new App was added. */ + BVS_ADDED, + /** An App was deleted. */ + BVS_DELETED, + /** Interface rule of an app was modified */ + BVS_INTERFACE_RULE_CHANGED_MATCH_SWITCH_PORT, + /** Some App configuration was changed */ + BVS_PRIORITY_CHANGED, + /** ACL configuration was changed */ + ACL_CONFIG_CHANGED, + /** VRS routing rule was changed */ + VRS_ROUTING_RULE_CHANGED, + /** device had moved to a different port in the network */ + DEVICE_MOVED, + /** device's property had changed, such as tag assignment */ + DEVICE_PROPERTY_CHANGED, + /** Link down */ + LINK_DOWN, + /** second round query caused by rewrite flags set */ + REWRITE_QUERY, + } + /** + * A FloodlightContextStore object that can be used to interact with the + * FloodlightContext information about flowCache. + */ + public static final FloodlightContextStore<String> fcStore = + new FloodlightContextStore<String>(); + /** + * Submit a network flow query with query parameters specified in FCQueryObj + * object. The query object can be created using one of the newFCQueryObj + * helper functions in IFlowCache interface. + * + * The queried flows are returned via the flowQueryRespHandler() callback + * that the caller must implement. The caller can match the query with + * the response using unique callerOpaqueData which remains unchanged + * in the request and response callback. + * + * @see com.bigswitch.floodlight.flowcache#flowQueryRespHandler + * @param query the flow cache query object as input + */ + public void submitFlowQueryEvent(FCQueryObj query, EventPriority priority); + + /** + * Flush Local Counter Updates + * + */ + public void updateFlush(); + public void querySwitchFlowTable(long swDpid); +} diff --git a/src/main/java/net/floodlightcontroller/flowcache/PriorityPendingQueue.java b/src/main/java/net/floodlightcontroller/flowcache/PriorityPendingQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..f4a587e3308edd415bfdc9f44d9f2f1c187efad9 --- /dev/null +++ b/src/main/java/net/floodlightcontroller/flowcache/PriorityPendingQueue.java @@ -0,0 +1,191 @@ +package net.floodlightcontroller.flowcache; + +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * PriorityPendingQueue class - This class is a variant implementation for PriorityBlockingQueue + * PriorityBlockingQueue implementation has two problems: + * 1. service for events with the same priority has no guarantee of FIFO sequence. This can be solved by override of comparator though. + * 2. PriorityBlockingQueue is implemented through heap, which has a O(log(n)) complexity for enqueue and dequeue operations. + * to get a O(1) complexity with enqueue and dequeue operations, we propose this PriorityPendingList class. + * <p> + * PriorityPendingQueue has three separate queues: High Priority, Medium Priority and Low Priority. + * the requirements here are: + * 1. dequeue from the Queue will always return the event with the highest priority + * 2. events with the same priority will be dequeued in their inserting order + * 3. enqueue and dequeue have O(1) complexity + * + * current only support offer() and take() methods + * + * @author meiyang + * + */ +public class PriorityPendingQueue<E> extends AbstractQueue<E> { + private LinkedBlockingQueue<E> highPriorityQueue; + private LinkedBlockingQueue<E> mediumPriorityQueue; + private LinkedBlockingQueue<E> lowPriorityQueue; + private final AtomicInteger count = new AtomicInteger(0); + private final ReentrantLock takeLock = new ReentrantLock(); + private final Condition notEmpty = takeLock.newCondition(); + private final ReentrantLock putLock = new ReentrantLock(); + private final Condition notFull = putLock.newCondition(); + private int capacity; + public enum EventPriority { + EVENT_HIGH, + EVENT_MEDIUM, + EVENT_LOW, + } + public PriorityPendingQueue() { + highPriorityQueue= new LinkedBlockingQueue<E>(); + mediumPriorityQueue= new LinkedBlockingQueue<E>(); + lowPriorityQueue= new LinkedBlockingQueue<E>(); + capacity= Integer.MAX_VALUE; + } + + public E take() throws InterruptedException { + E x; + int c = -1; + final AtomicInteger count = this.count; + final ReentrantLock takeLock = this.takeLock; + takeLock.lockInterruptibly(); + try { + try { + while (count.get() == 0) + notEmpty.await(); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to a non-interrupted thread + throw ie; + } + x = extract(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + if (c == capacity) + signalNotFull(); + return x; + } + + public E poll() { + final AtomicInteger count = this.count; + if (count.get() == 0) + return null; + E x = null; + int c = -1; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + if (count.get() > 0) { + x = extract(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + } + } finally { + takeLock.unlock(); + } + if (c == capacity) + signalNotFull(); + return x; + } + + public E peek() { + //todo + return null; + } + + public boolean offer(E e, EventPriority p) { + if (e == null) throw new NullPointerException(); + final AtomicInteger count = this.count; + if (count.get() == capacity) + return false; + int c = -1; + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + if (count.get() < capacity) { + insert(e,p); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + } + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + return c >= 0; + } + + public boolean offer(E e) { + return false; + } + + private E extract() { + E first = highPriorityQueue.poll(); + if (first==null) + first = mediumPriorityQueue.poll(); + if (first==null) + first = lowPriorityQueue.poll(); + return first; + } + + private void insert(E e, EventPriority p) { + if (p==EventPriority.EVENT_HIGH) + highPriorityQueue.offer(e); + if (p==EventPriority.EVENT_MEDIUM) + mediumPriorityQueue.offer(e); + if (p==EventPriority.EVENT_LOW) + lowPriorityQueue.offer(e); + } + + private void signalNotFull() { + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + notFull.signal(); + } finally { + putLock.unlock(); + } + } + + private void signalNotEmpty() { + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + } + + @Override + public Iterator<E> iterator() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int size() { + return count.get(); + } + @Override + public void clear() { + highPriorityQueue.clear(); + mediumPriorityQueue.clear(); + lowPriorityQueue.clear(); + count.set(0); + } + @Override + public boolean isEmpty() { + return count.get() == 0; + } +} \ No newline at end of file