Introduction
Edge computing is ideally suited for querying populations of fast data streams. However, developing and evaluating edge-based query processing algorithms is challenging due to the decentralized setting and the lack of effective tools. For example, current simulators fail to execute real queries. To rectify these shortcomings, we propose TEQ, a Testbed for Edge-based Query processing algorithms, designed expressly to be open, enabling extensible simulation and algorithm deployment, and to be developer-friendly, simplifying otherwise cumbersome simulation, complex algorithm management, and tedious evaluation.
TEQ encompasses novel techniques to achieve these capabilities, including:
- a streamlined framework for implementing decentralized algorithms as modular, Docker-based executables;
- data and query playback controls to enable reproducible and consistent evaluations;
- mechanisms for transforming runtime metrics into system-wide metrics desired by algorithm developers.
Now, TEQ is open source for everyone. You can find the source code in GitHub.
Run TEQ
Requirements
- Java 11 (Higher Java versions are temporarily not supported)
- Maven 3.6+
- Docker
- Node.js 22+ (for frontend development)
- Linux system or WSL (Windows Subsystem for Linux)
Installation Steps
- Clone the project - git clone https://github.com/leiyu-dev/docker-TEQ.git cd docker-TEQ
- Pull docker image - docker pull leiyu0503/teq:1.0
- Start backend project - mvn clean compile mvn exec:java -Dexec.mainClass="example.Example" -pl Computing # or you can execute the run.sh script
- Launch Web interface (optional) - cd front/visualizer npm install npm run dev
Note: If you launch the web interface on a server, you need to forward port 8889 (backend port, can be configured in the configurator) to your local machine. The web interface will be integrated into the Java application in the near future.
Implement Your Own Algorithm
To implement your own algorithm, you need to create a Simulator object to operate the entire simulator first. The Simulator requires a Network class as the networkHost node. If no additional operations are needed, you can directly use the default constructor.
The simulator by default contains four layers, and for each layer, you need to implement an abstract class to describe the data processing logic at each layer in edge computing. In the example, we implemented a local STkFIQ model based on Flink. You can read the code in the example folder (which is very easy to understand) to learn about the TEQ programming process.
After adding all the layer needed to the simulator, finally, you can run the simulator by calling the start method of the Simulator object. The simulator will start the simulation process and run the algorithm you implemented.
Here is a simple example of how to implement a custom algorithm:
public class Example {
    public static void main(String[] args) {
        // create node parameters
        DockerNodeParameters param = new DockerNodeParameters();
        param.setCpuUsageRate(0.5);
        
        // different type of node for different layers
        EndDevice endDevice = new EndDevice();
        Coordinator coordinator = new Coordinator();
        Worker worker = new Worker();
        DataCenter dataCenter = new DataCenter();
        
        // layered model
        Layer endDeviceLayer = new Layer(endDevice, 300, "EndDeviceLayer");
        Layer coordinatorLayer = new Layer(coordinator, 1, "CoordinatorLayer");
        Layer workerLayer = new Layer(worker, 3, "WorkerLayer");
        Layer dataCenterLayer = new Layer(dataCenter, 1, "DataCenterLayer");
        
        // start the simulator
        Simulator simulator = new Simulator(new Network());
        simulator.addLayer(endDeviceLayer);
        simulator.addLayer(coordinatorLayer);
        simulator.addLayer(workerLayer);
        simulator.addLayer(dataCenterLayer);
        
        simulator.start();
    }
}// End Device Layer Implementation
public class EndDevice extends AbstractEndDeviceNode {
    @Override
    public void process(Object input, Object output, String pipe) {
        if (pipe == null) {
            // Send data/query via E2C pipe
            emit(data, "E2C");
        }
        if (pipe.equals("C2E")) {
            // Display query results
            displayResults(input);
        }
    }
}
// Worker Layer Implementation  
public class Worker extends AbstractWorkerNode {
    @Override
    public void process(Object input, Object output, String pipe) {
        if (pipe.equals("C2W")) {
            // Local Top-k finding
            List<Item> localTopK = findLocalTopK(input);
            send(localTopK, "W2C");
        }
    }
}
// Coordinator Layer Implementation
public class Coordinator extends AbstractCoordinatorNode {
    @Override  
    public void process(Object input, Object output, String pipe) {
        if (pipe.equals("E2C")) {
            // Map data/query to workers
            mapToWorkers(input, "C2W");
        }
        if (pipe.equals("W2C")) {
            // Map results back to end devices
            mapToEndDevices(input, "C2E");
        }
    }
}Custom Flink Processing Node
public class CustomFlinkNode extends AbstractFlinkNode {
    @Override
    public void flinkProcess() {
        StreamExecutionEnvironment env = getEnv();
        DataStream<String> dataStream = env.readTextFile("./data.txt");
        
        // Custom data processing logic
        DataStream<ProcessedData> processedStream = dataStream
            .map(new MapFunction<String, ProcessedData>() {
                @Override
                public ProcessedData map(String value) throws Exception {
                    // Process individual data item
                    return processDataItem(value);
                }
            });
            
        processedStream.print();
    }
}