144 lines
5.2 KiB
Python
144 lines
5.2 KiB
Python
from flask import Flask,jsonify,request,abort, make_response,send_file
|
|
from werkzeug.exceptions import HTTPException
|
|
from nornir import InitNornir
|
|
import os
|
|
from werkzeug.utils import secure_filename
|
|
import json
|
|
|
|
ALLOWED_EXTENSIONS = {'conf'}
|
|
UPLOAD_FOLDER = 'fastprod/upload_files/'
|
|
|
|
|
|
def init_nornir():
|
|
app.config['nr'] = InitNornir(config_file="fastprod/inventory/config.yaml")
|
|
|
|
|
|
|
|
def allowed_file(filename):
|
|
return '.' in filename and \
|
|
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
app = Flask(__name__)
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
|
|
|
|
from services.devices import ( get_inventory, add_device,get_device_by_name,delete_device,get_device_interfaces,get_device_interfaces_ip,get_device_technical_info)
|
|
from services.config import (get_config_by_device,run_config_from_file_by_device)
|
|
from services.tasks import (run_show_commands_by_device,run_config_commands_by_device)
|
|
from services.snapshots import (get_snapshot_by_name,create_snapshot_by_device,get_snapshots_by_device)
|
|
init_nornir()
|
|
|
|
@app.route("/")
|
|
def hello_world():
|
|
return jsonify({
|
|
"env": "DEV",
|
|
"name": "fastprod_backend",
|
|
"version": 1.0
|
|
})
|
|
|
|
@app.route("/devices", methods=['GET', 'POST'])
|
|
def devices():
|
|
if request.method == 'GET':
|
|
init_nornir()
|
|
devices = get_inventory()
|
|
return jsonify(devices=devices, total_count=len(devices))
|
|
if request.method == 'POST':
|
|
data = request.get_json()
|
|
new_device = add_device(data)
|
|
return jsonify(device=new_device)
|
|
|
|
@app.route("/devices/<device_name>", methods=['GET', 'DELETE'])
|
|
def device_by_name(device_name):
|
|
if request.method == 'GET':
|
|
device = get_device_by_name(device_name)
|
|
return jsonify(device=device)
|
|
if request.method == 'DELETE':
|
|
device = get_device_by_name(device_name)
|
|
delete_device(device)
|
|
return jsonify(message="Device deleted")
|
|
|
|
|
|
@app.route("/devices/<device_name>/interfaces", methods=['GET'])
|
|
def get_interfaces(device_name):
|
|
device = get_device_by_name(device_name)
|
|
interfaces = get_device_interfaces(device)
|
|
return jsonify(interfaces=interfaces)
|
|
|
|
@app.route("/devices/<device_name>/interfaces/ip", methods=['GET'])
|
|
def get_interfaces_ip(device_name):
|
|
if request.method == 'GET':
|
|
device = get_device_by_name(device_name)
|
|
interfaces_ip = get_device_interfaces_ip(device)
|
|
return jsonify(interfaces_ip=interfaces_ip)
|
|
|
|
@app.route("/devices/<device_name>/facts", methods=['GET'])
|
|
def get_technical_info(device_name):
|
|
if request.method == 'GET':
|
|
device = get_device_by_name(device_name)
|
|
technical_info = get_device_technical_info(device)
|
|
return jsonify(interfaces_ip=technical_info)
|
|
|
|
@app.route("/devices/<device_name>/config", methods=['GET','POST'])
|
|
def get_config(device_name):
|
|
device = get_device_by_name(device_name)
|
|
if request.method == 'GET':
|
|
config = get_config_by_device(device)
|
|
return jsonify(interfaces_ip=config)
|
|
if request.method == 'POST':
|
|
if request.files.to_dict(flat=False).get('config_file'):
|
|
file = request.files['config_file']
|
|
if not allowed_file(file.filename):
|
|
abort(make_response(jsonify(message="file extension not allowed"), 403))
|
|
filename = secure_filename(file.filename)
|
|
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
|
result = run_config_from_file_by_device(device,file_path=os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
|
return jsonify(result=result)
|
|
if request.get_json().get('mode') == 'enable':
|
|
commands = request.get_json().get('commands')
|
|
result = run_show_commands_by_device(device, commands=commands)
|
|
return jsonify(result=result)
|
|
if request.get_json().get('mode') == 'config':
|
|
commands = request.get_json().get('commands')
|
|
result = run_config_commands_by_device(device, commands=commands)
|
|
return jsonify(result=result, commands=commands)
|
|
|
|
@app.route("/devices/<device_name>/snapshots", methods=['GET','POST'])
|
|
def snapshot(device_name):
|
|
device = get_device_by_name(device_name)
|
|
print(device)
|
|
if request.method == 'GET':
|
|
data = get_snapshots_by_device(device)
|
|
return jsonify({
|
|
"result": True,
|
|
"data": data
|
|
})
|
|
if request.method == 'POST':
|
|
snapshot_path = create_snapshot_by_device(device)
|
|
return jsonify({
|
|
"result": True,
|
|
"data": {
|
|
"snapshot_path": snapshot_path
|
|
}
|
|
})
|
|
|
|
@app.route("/snapshots/<path:filename>", methods=['GET'])
|
|
def snapshot_by_name(filename):
|
|
if request.method == 'GET':
|
|
try:
|
|
return send_file('snapshots/'+filename)
|
|
except FileNotFoundError:
|
|
abort(make_response(jsonify(message="snapshot not found"), 404))
|
|
|
|
@app.errorhandler(HTTPException)
|
|
def handle_exception(e):
|
|
"""Return JSON instead of HTML for HTTP errors."""
|
|
# start with the correct headers and status code from the error
|
|
response = e.get_response()
|
|
# replace the body with JSON
|
|
response.data = json.dumps({
|
|
"code": e.code,
|
|
"name": e.name,
|
|
"description": e.description,
|
|
})
|
|
response.content_type = "application/json"
|
|
return response
|