Skip to content

Add linux sockscan plugin #1120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
aae3f5b
Linux: add first draft of sockscan plugin
eve-mem Mar 26, 2024
d6afbc0
Merge branch 'volatilityfoundation:develop' into linux_sockscan
eve-mem Mar 27, 2024
67291ce
Linux: update sockscan to scan memory layer only once for needles
eve-mem Mar 27, 2024
4f1f480
Linux: add test for sockscan plugin
eve-mem Mar 27, 2024
2c8b757
Linux: update sockscan family check to use 'is None' rather than '== …
eve-mem Mar 28, 2024
f3f5650
Linux: update sockscan with checks to reduce possible duplication of …
eve-mem Mar 28, 2024
fb67d63
Linux: update tests for sockscan to be more generic
eve-mem Mar 28, 2024
3fa6306
Update volatility3/framework/plugins/linux/sockscan.py
ikelos Apr 3, 2024
e4ea19d
Linux: sockscan update based on comments from @gcmoreira, add version…
eve-mem Aug 2, 2024
a3393a9
Linux: fix black formatting issues
eve-mem Aug 2, 2024
3c37732
Linux: Begin to breakdown the functions in sockscan plugin
eve-mem Aug 2, 2024
8a3f6ee
Merge branch 'volatilityfoundation:develop' into linux_sockscan
eve-mem Sep 27, 2024
dcded3a
Update test/test_volatility.py
eve-mem Apr 24, 2025
0fbe039
Update volatility3/framework/plugins/linux/sockscan.py
eve-mem Apr 24, 2025
95c5435
Update volatility3/framework/plugins/linux/sockscan.py
eve-mem Apr 24, 2025
40144e7
Merge branch 'develop' into linux_sockscan
eve-mem Apr 24, 2025
5e549b2
Linux: update sockstat requirements
eve-mem Apr 24, 2025
1a57a66
Linux: update sockstat ruff issue with f-string using no placeholders
eve-mem Apr 24, 2025
e55ab33
Linux: update tests for sockscan
eve-mem Apr 24, 2025
ca584cc
Linux: Update linux sockscan to work with SockHandlers v4.0.0
eve-mem Apr 24, 2025
704f0f0
Linux: Fix incorrect first parameter name in test_linux_sockscan
eve-mem Apr 24, 2025
d532f37
Linux: sockstat contiuned work to breakdown large generator function
eve-mem Apr 24, 2025
787f3b6
Revert "Linux: sockstat contiuned work to breakdown large generator f…
eve-mem Apr 25, 2025
ab5dac1
Linux: fix virtual to physical offsets for sockscan file_ops method
eve-mem Apr 25, 2025
0d32cac
Linux: fix sockscan use of direct imports of TreeGrid and NotAvailabl…
eve-mem Apr 25, 2025
6f11161
Linux: add MultiStringScanner requirement for sockscan
eve-mem Apr 25, 2025
7cfc03d
Linux: update sockscan to use minor version for the requirements this…
eve-mem Apr 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion test/test_volatility.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def test_windows_thrdscan(image, volatility, python):
assert out.find(b"\t4\t8") != -1
assert out.find(b"\t4\t12") != -1
assert out.find(b"\t4\t16") != -1
#assert out.find(b"this raieses AssertionError") != -1
# assert out.find(b"this raieses AssertionError") != -1
assert rc == 0


Expand Down Expand Up @@ -368,6 +368,27 @@ def test_linux_library_list(image, volatility, python):
assert rc == 0


def test_linux_sockscan(image, volatility, python):
# designed for linux-sample-1.dmp SHA1:1C3A4627EDCA94A7ADE3414592BEF0E62D7D3BB6
rc, out, err = runvol_plugin("linux.sockscan.Sockscan", image, volatility, python)

assert re.search(
rb"AF_UNIX\s+STREAM\s+-\s+/tmp/pulse-JldaJj8OxQLa/native\s+14054\s+-\s+14053\s+ESTABLISHED\s+-",
out,
)
assert re.search(
rb"AF_INET\s+STREAM\s+TCP\s+192.168.201.161\s+22\s+192.168.201.1\s+59982\s+ESTABLISHED\s+-",
out,
)
assert re.search(
rb"AF_INET\s+STREAM\s+TCP\s+0.0.0.0\s+901\s+0.0.0.0\s+0\s+LISTEN\s+-",
out,
)

assert out.count(b"\n") >= 50
assert rc == 0


# MAC


Expand Down
350 changes: 350 additions & 0 deletions volatility3/framework/plugins/linux/sockscan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import logging
import struct
from typing import List, Set

from volatility3.framework import exceptions, constants
from volatility3.framework.renderers import TreeGrid, NotAvailableValue, format_hints
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.symbols import linux
from volatility3.plugins.linux import sockstat
from volatility3.framework import symbols
from volatility3.framework import symbols, constants
from volatility3.framework.layers import scanners

vollog = logging.getLogger(__name__)


class Sockscan(plugins.PluginInterface):
"""Scans for network connections found in memory layer."""

_required_framework_version = (2, 6, 0)

@classmethod
def get_requirements(cls):
return [
requirements.ModuleRequirement(
name="kernel",
description="Linux kernel",
architectures=["Intel32", "Intel64"],
),
requirements.VersionRequirement(
name="SockHandlers", component=sockstat.SockHandlers, version=(1, 0, 0)
),
requirements.VersionRequirement(
name="linuxutils", component=linux.LinuxUtilities, version=(2, 0, 0)
),
]

def _canonicalize_symbol_addrs(
self, symbol_table_name: List[str], symbol_names: str
) -> Set[bytes]:
"""Takes a list of symbol names and converts the address of each to the bytes
as they would appear in memory so that they can be scanned for.

Symbols that cannot be found are ignored and not included in the results.

Args:
symbol_table_name: The name of the kernel module on which to operate
symbol_names: A list of symbol names to be looked up

Returns:
A set of bytes which are the packed addresses.
"""
# get vmlinux module from context in order to build objects and read symbols
vmlinux = self.context.modules[symbol_table_name]

# get kernel layer from context so that it's dependencies can be found, and therefore scanned.
# kernel layer will be virtual and built ontop of a physical layer.
kernel_layer = self.context.layers[vmlinux.layer_name]

# detmine if kernel is 64bit or not. The plugin scans for pointers and these need to formated
# to the correct size so that they can be accurately located in the physical layer.
if symbols.symbol_table_is_64bit(self.context, vmlinux.symbol_table_name):
pack_format = "Q" # 64 bit
else:
pack_format = "I" # 32 bit

packed_needles = set()
for symbol_name in symbol_names:
try:
needle_addr = vmlinux.object_from_symbol(symbol_name).vol.offset
except exceptions.SymbolError:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Unable to find symbol {symbol_name} this will not be scanned for.",
)
continue
# use canonicalize to set the appropriate sign extension for the addr
addr = kernel_layer.canonicalize(needle_addr)
packed_addr = struct.pack(pack_format, addr)
packed_needles.add(packed_addr)
vollog.log(
constants.LOGLEVEL_VVVV,
f"Will scan for {symbol_name} using the bytes: {packed_addr.hex()}",
)

# make a warning if no symbols at all could be resolved.
if len(packed_needles) == 0:
vollog.warning(
f"_canonicalize_symbol_addrs was unable to resolve any symbols, use -vvvv for more information."
)

return packed_needles

def _generator(self, symbol_table_name: str):
"""Scans for sockets. Each row represents a kernel socket.

Args:
symbol_table_name: The name of the kernel module on which to operate

Yields:
addr: Physical offset
family: Socket family string (AF_UNIX, AF_INET, etc)
sock_type: Socket type string (STREAM, DGRAM, etc)
protocol: Protocol string (UDP, TCP, etc)
source addr: Source address string
source port: Source port string (not all of them are int)
destination addr: Destination address string
destination port: Destination port (not all of them are int)
state: State strings (LISTEN, CONNECTED, etc)
"""

# get vmlinux module from context in order to build objects and read symbols
vmlinux = self.context.modules[symbol_table_name]

# get kernel layer from context so that it's dependencies can be found, and therefore scanned.
# kernel layer will be virtual and built ontop of a physical layer.
kernel_layer = self.context.layers[vmlinux.layer_name]

# TODO: Update plugin to support multiple dependencies. e.g. a memory layer and swap file.
# This is a shared problem with psscan and having a generic solution would be useful.

# Find the memory layer to scan, and provide warnings if more than one is located.
if len(kernel_layer.dependencies) > 1:
vollog.warning(
f"Kernel layer depends on multiple layers however only {kernel_layer.dependencies[0]} will be scanned by this plugin."
)
elif len(kernel_layer.dependencies) == 0:
vollog.error(
f"Kernel layer has no dependencies, meaning there is no memory layer for this plugin to scan."
)
raise exceptions.LayerException(
vmlinux.layer_name, f"Layer {vmlinux.layer_name} has no dependencies"
)
memory_layer_name = kernel_layer.dependencies[0]
memory_layer = self.context.layers[kernel_layer.dependencies[0]]

# use the init process to build a sock handler
# TODO: look into options so that sockstat.SockHandlers so that process_sock can
# be used without a task object.
init_task = vmlinux.object_from_symbol(symbol_name="init_task")
sock_handler = sockstat.SockHandlers(vmlinux, init_task)

# get progress_callback in order to use this in the scanners.
# TODO: perhaps add more detail to progress, showing method in progress and number of hits found
progress_callback = self._progress_callback

# Method 1 - find sockets by file operations, then follow pointers to sockets
file_ops_symbol_names = [
"socket_file_ops",
"sockfs_dentry_operations",
]
file_ops_needles = self._canonicalize_symbol_addrs(
symbol_table_name, file_ops_symbol_names
)
# get file struct to find the offset to the f_op pointer
# this is so that the file object can be created at the correct offset,
# the results of the scanner will be for the f_op member within the file
f_op_offset = vmlinux.get_type("file").members["f_op"][0]

# Method 2 - find sockets by socket destructor directly inside sock objects
socket_destructor_symbol_names = [
"sock_def_destruct",
"packet_sock_destruct",
"unix_sock_destructor",
"netlink_sock_destruct",
"inet_sock_destruct",
]
socket_destructor_needles = self._canonicalize_symbol_addrs(
symbol_table_name, socket_destructor_symbol_names
)
# get sock struct to find the offset to the sk_destruct pointer
# this is so that the sock object can be created at the correct offset,
# the results of the scanner will be for the sk_destruct member within the scock
sk_destruct_offset = vmlinux.get_type("sock").members["sk_destruct"][0]

# TODO Method 3 - find sock by sk_error_report symbols
# sk_error_report_symbol_names = ['sock_def_error_report', 'inet_sk_rebuild_header', 'inet_listen']
# this would be similar to Method 2, but using a different pointer within sock.

# Using the calculated needles, scan the memory layer and attempt to parse the sockets located.
for needle_addr, match in memory_layer.scan(
self.context,
scanners.MultiStringScanner(socket_destructor_needles | file_ops_needles),
progress_callback,
):
psock = None
sock_physical_addr = None

# if match is from socket_destructor_needles simply calculate the offset
# to the sock
if match in socket_destructor_needles:
sock_physical_addr = needle_addr - sk_destruct_offset
psock = self.context.object(
vmlinux.symbol_table_name + constants.BANG + "sock",
offset=sock_physical_addr,
layer_name=memory_layer_name,
native_layer_name=vmlinux.layer_name,
)

# if match is from file_ops_needles attempt to walk from file object to
# the sock
if match in file_ops_needles:
try:
# create file in the memory_layer, the native layer matches the
# kernel so that pointers can be followed
sock_physical_addr = needle_addr - f_op_offset
pfile = self.context.object(
vmlinux.symbol_table_name + constants.BANG + "file",
offset=sock_physical_addr,
layer_name=memory_layer_name,
native_layer_name=vmlinux.layer_name,
)
dentry = pfile.get_dentry()
if not dentry:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Skipping file at {hex(needle_addr)} as unable to locate dentry",
)
continue

d_inode = dentry.d_inode
if not d_inode:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Skipping file at {hex(needle_addr)} as unable to locate inode for dentry",
)
continue

socket_alloc = linux.LinuxUtilities.container_of(
d_inode, "socket_alloc", "vfs_inode", vmlinux
)
socket = socket_alloc.socket
if not (socket and socket.sk):
vollog.log(
constants.LOGLEVEL_VVVV,
f"Skipping file at {hex(needle_addr)} as socket created by LinuxUtilities.container_of is invalid",
)
continue

# sucessfully trversed from file to sock, this will exist in the
# kernel layer, and need to be translated to the memory layer.
psock = socket.sk.dereference()
except exceptions.InvalidAddressException as error:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Unable to follow file at {hex(needle_addr)} to socket due to invalid address: {error}",
)

if psock is not None:
try:
sock_type = psock.get_type()

family = psock.get_family()
# remove results with no family
if family == None:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Skipping socket at {hex(sock_physical_addr)} as unable to determin family.",
)
continue

# TODO: invesitgate options for more invalid address handling in proccess_sock
# and the later formatting of it's results.
sock_fields = sock_handler.process_sock(psock)
# if no sock_fields we're able to be extracted then skip this result.
if not sock_fields:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Skipping socket at {hex(sock_physical_addr)} as unable to process with SockHandlers.",
)
continue

sock, sock_stat, extended = sock_fields
src, src_port, dst, dst_port, state = sock_stat
protocol = sock.get_protocol()

# format results
src = NotAvailableValue() if src is None else str(src)
src_port = (
NotAvailableValue() if src_port is None else str(src_port)
)
dst = NotAvailableValue() if dst is None else str(dst)
dst_port = (
NotAvailableValue() if dst_port is None else str(dst_port)
)
state = NotAvailableValue() if state is None else str(state)
protocol = (
NotAvailableValue() if protocol is None else str(protocol)
)
# extended attributes is a dict, so this is formated to string show each
# key and value pair, seperated with a comma.
socket_filter_str = (
",".join(f"{k}={v}" for k, v in extended.items())
if extended
else NotAvailableValue()
)

# remove empty results
if (src == "0.0.0.0" or isinstance(src, NotAvailableValue)) and (
dst == "0.0.0.0" or isinstance(src, NotAvailableValue)
):
if state == "UNCONNECTED":
continue
elif src_port == "0" and dst_port == "0":
continue

fields = (
format_hints.Hex(sock_physical_addr),
family,
sock_type,
protocol,
src,
src_port,
dst,
dst_port,
state,
socket_filter_str,
)

yield (0, fields)
except exceptions.InvalidAddressException as error:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Unable create results for socket at {hex(sock_physical_addr)} due to invalid address: {error}",
)

def run(self):

tree_grid_args = [
("Sock Offset", format_hints.Hex),
("Family", str),
("Type", str),
("Proto", str),
("Source Addr", str),
("Source Port", str),
("Destination Addr", str),
("Destination Port", str),
("State", str),
("Filter", str),
]

return TreeGrid(
tree_grid_args,
self._generator(self.config["kernel"]),
)
Loading