Spaces:
Running
Running
from langflow.utils.version import get_version_info | |
from .model import Flow | |
def get_webhook_component_in_flow(flow_data: dict): | |
"""Get webhook component in flow data.""" | |
for node in flow_data.get("nodes", []): | |
if "Webhook" in node.get("id"): | |
return node | |
return None | |
def get_all_webhook_components_in_flow(flow_data: dict | None): | |
"""Get all webhook components in flow data.""" | |
if not flow_data: | |
return [] | |
return [node for node in flow_data.get("nodes", []) if "Webhook" in node.get("id")] | |
def get_components_versions(flow: Flow): | |
versions: dict[str, str] = {} | |
if flow.data is None: | |
return versions | |
nodes = flow.data.get("nodes", []) | |
for node in nodes: | |
data = node.get("data", {}) | |
data_node = data.get("node", {}) | |
if "lf_version" in data_node: | |
versions[node["id"]] = data_node["lf_version"] | |
return versions | |
def get_outdated_components(flow: Flow): | |
component_versions = get_components_versions(flow) | |
lf_version = get_version_info()["version"] | |
outdated_components = [] | |
for key, value in component_versions.items(): | |
if value != lf_version: | |
outdated_components.append(key) | |
return outdated_components | |